[hornetq-commits] JBoss hornetq SVN: r8036 - in trunk: src/config and 31 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Oct 2 11:47:10 EDT 2009
Author: timfox
Date: 2009-10-02 11:47:08 -0400 (Fri, 02 Oct 2009)
New Revision: 8036
Added:
trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
Removed:
trunk/src/main/org/hornetq/core/client/impl/ConnectionManager.java
trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
Modified:
trunk/docs/user-manual/en/appserver-integration.xml
trunk/src/config/common/schema/hornetq-jms.xsd
trunk/src/config/ra.xml
trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/hornetq/jms/server/management/ConnectionFactoryControl.java
trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
trunk/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
trunk/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
trunk/src/main/org/hornetq/ra/HornetQRAMetaData.java
trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
trunk/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-162
Modified: trunk/docs/user-manual/en/appserver-integration.xml
===================================================================
--- trunk/docs/user-manual/en/appserver-integration.xml 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/docs/user-manual/en/appserver-integration.xml 2009-10-02 15:47:08 UTC (rev 8036)
@@ -541,13 +541,8 @@
<entry>AutoGroup</entry>
<entry>boolean</entry>
<entry>If true then auto group messages</entry>
- </row>
+ </row>
<row>
- <entry>MaxConnections</entry>
- <entry>integer</entry>
- <entry>The max number of connections.</entry>
- </row>
- <row>
<entry>PreAcknowledge</entry>
<entry>boolean</entry>
<entry>Whether to pre acknowledge messages before sending to
Modified: trunk/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-jms.xsd 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/config/common/schema/hornetq-jms.xsd 2009-10-02 15:47:08 UTC (rev 8036)
@@ -87,9 +87,6 @@
<xsd:element name="auto-group-id" type="xsd:boolean"
maxOccurs="1" minOccurs="0">
</xsd:element>
- <xsd:element name="max-connections" type="xsd:int"
- maxOccurs="1" minOccurs="0">
- </xsd:element>
<xsd:element name="pre-acknowledge" type="xsd:boolean"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: trunk/src/config/ra.xml
===================================================================
--- trunk/src/config/ra.xml 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/config/ra.xml 2009-10-02 15:47:08 UTC (rev 8036)
@@ -203,8 +203,7 @@
<config-property-value></config-property-value>
</config-property>
<config-property>
- <description>The max connections</description>
- <config-property-name>MaxConnections</config-property-name>
+ <description>The max connections</description>
<config-property-type>java.lang.Integer</config-property-type>
<config-property-value></config-property-value>
</config-property>
Modified: trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -70,10 +70,6 @@
void setCallTimeout(long callTimeout);
- int getMaxConnections();
-
- void setMaxConnections(int maxConnections);
-
int getMinLargeMessageSize();
void setMinLargeMessageSize(int minLargeMessageSize);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -87,8 +87,6 @@
public static final long DEFAULT_CALL_TIMEOUT = 30000;
- public static final int DEFAULT_MAX_CONNECTIONS = 8;
-
public static final int DEFAULT_ACK_BATCH_SIZE = 1024 * 1024;
public static final boolean DEFAULT_PRE_ACKNOWLEDGE = false;
@@ -120,7 +118,7 @@
// Attributes
// -----------------------------------------------------------------------------------
- private final Map<Pair<TransportConfiguration, TransportConfiguration>, ConnectionManager> connectionManagerMap = new LinkedHashMap<Pair<TransportConfiguration, TransportConfiguration>, ConnectionManager>();
+ private final Map<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager> failoverManagerMap = new LinkedHashMap<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager>();
private volatile boolean receivedBroadcast = false;
@@ -132,7 +130,7 @@
private ConnectionLoadBalancingPolicy loadBalancingPolicy;
- private ConnectionManager[] connectionManagerArray;
+ private FailoverManager[] failoverManagerArray;
private boolean readOnly;
@@ -156,8 +154,6 @@
private long callTimeout;
- private int maxConnections;
-
private int minLargeMessageSize;
private int consumerWindowSize;
@@ -284,11 +280,10 @@
{
for (Pair<TransportConfiguration, TransportConfiguration> pair : staticConnectors)
{
- ConnectionManager cm = new ConnectionManagerImpl(this,
+ FailoverManager cm = new FailoverManagerImpl(this,
pair.a,
pair.b,
- failoverOnServerShutdown,
- maxConnections,
+ failoverOnServerShutdown,
callTimeout,
clientFailureCheckPeriod,
connectionTTL,
@@ -301,10 +296,10 @@
scheduledThreadPool,
interceptors);
- connectionManagerMap.put(pair, cm);
+ failoverManagerMap.put(pair, cm);
}
- updateConnectionManagerArray();
+ updatefailoverManagerArray();
}
else
{
@@ -328,8 +323,6 @@
callTimeout = DEFAULT_CALL_TIMEOUT;
- maxConnections = DEFAULT_MAX_CONNECTIONS;
-
minLargeMessageSize = DEFAULT_MIN_LARGE_MESSAGE_SIZE;
consumerWindowSize = DEFAULT_CONSUMER_WINDOW_SIZE;
@@ -464,17 +457,6 @@
this.callTimeout = callTimeout;
}
- public synchronized int getMaxConnections()
- {
- return maxConnections;
- }
-
- public synchronized void setMaxConnections(int maxConnections)
- {
- checkWrite();
- this.maxConnections = maxConnections;
- }
-
public synchronized int getMinLargeMessageSize()
{
return minLargeMessageSize;
@@ -833,9 +815,9 @@
{
int num = 0;
- for (ConnectionManager connectionManager : connectionManagerMap.values())
+ for (FailoverManager failoverManager : failoverManagerMap.values())
{
- num += connectionManager.numSessions();
+ num += failoverManager.numSessions();
}
return num;
@@ -845,9 +827,9 @@
{
int num = 0;
- for (ConnectionManager connectionManager : connectionManagerMap.values())
+ for (FailoverManager failoverManager : failoverManagerMap.values())
{
- num += connectionManager.numConnections();
+ num += failoverManager.numConnections();
}
return num;
@@ -872,12 +854,12 @@
}
}
- for (ConnectionManager connectionManager : connectionManagerMap.values())
+ for (FailoverManager failoverManager : failoverManagerMap.values())
{
- connectionManager.causeExit();
+ failoverManager.causeExit();
}
- connectionManagerMap.clear();
+ failoverManagerMap.clear();
if (!useGlobalPools)
{
@@ -926,15 +908,15 @@
connectorSet.add(entry.getConnectorPair());
}
- Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, ConnectionManager>> iter = connectionManagerMap.entrySet()
+ Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager>> iter = failoverManagerMap.entrySet()
.iterator();
while (iter.hasNext())
{
- Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, ConnectionManager> entry = iter.next();
+ Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager> entry = iter.next();
if (!connectorSet.contains(entry.getKey()))
{
- // ConnectionManager no longer there - we should remove it
+ // failoverManager no longer there - we should remove it
iter.remove();
}
@@ -942,15 +924,14 @@
for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectorSet)
{
- if (!connectionManagerMap.containsKey(connectorPair))
+ if (!failoverManagerMap.containsKey(connectorPair))
{
- // Create a new ConnectionManager
+ // Create a new failoverManager
- ConnectionManager connectionManager = new ConnectionManagerImpl(this,
+ FailoverManager failoverManager = new FailoverManagerImpl(this,
connectorPair.a,
connectorPair.b,
- failoverOnServerShutdown,
- maxConnections,
+ failoverOnServerShutdown,
callTimeout,
clientFailureCheckPeriod,
connectionTTL,
@@ -963,16 +944,16 @@
scheduledThreadPool,
interceptors);
- connectionManagerMap.put(connectorPair, connectionManager);
+ failoverManagerMap.put(connectorPair, failoverManager);
}
}
- updateConnectionManagerArray();
+ updatefailoverManagerArray();
}
- public ConnectionManager[] getConnectionManagers()
+ public FailoverManager[] getFailoverManagers()
{
- return connectionManagerArray;
+ return failoverManagerArray;
}
// Protected ------------------------------------------------------------------------------
@@ -1035,11 +1016,11 @@
synchronized (this)
{
- int pos = loadBalancingPolicy.select(connectionManagerArray.length);
+ int pos = loadBalancingPolicy.select(failoverManagerArray.length);
- ConnectionManager connectionManager = connectionManagerArray[pos];
+ FailoverManager failoverManager = failoverManagerArray[pos];
- ClientSession session = connectionManager.createSession(username,
+ ClientSession session = failoverManager.createSession(username,
password,
xa,
autoCommitSends,
@@ -1082,11 +1063,11 @@
}
}
- private synchronized void updateConnectionManagerArray()
+ private synchronized void updatefailoverManagerArray()
{
- connectionManagerArray = new ConnectionManager[connectionManagerMap.size()];
+ failoverManagerArray = new FailoverManager[failoverManagerMap.size()];
- connectionManagerMap.values().toArray(connectionManagerArray);
+ failoverManagerMap.values().toArray(failoverManagerArray);
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -107,7 +107,7 @@
// Attributes ----------------------------------------------------------------------------
- private final ConnectionManager connectionManager;
+ private final FailoverManager connectionManager;
private final String name;
@@ -177,7 +177,7 @@
// Constructors ----------------------------------------------------------------------------
- public ClientSessionImpl(final ConnectionManager connectionManager,
+ public ClientSessionImpl(final FailoverManager connectionManager,
final String name,
final String username,
final String password,
@@ -785,7 +785,7 @@
{
return true;
}
-
+
boolean ok = false;
// We lock the channel to prevent any packets to be added to the resend
@@ -795,7 +795,7 @@
try
{
channel.transferConnection(backupConnection);
-
+
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = backupConnection;
@@ -807,7 +807,7 @@
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
if (response.isSessionFound())
- {
+ {
channel.replayCommands(response.getLastReceivedCommandID(), channel.getID());
ok = true;
@@ -862,7 +862,7 @@
}
boolean ok = false;
-
+
// Need to stop all consumers outside the lock
for (ClientConsumerInternal consumer : consumers.values())
{
@@ -1000,7 +1000,7 @@
channel.returnBlocking();
}
- public ConnectionManager getConnectionManager()
+ public FailoverManager getConnectionManager()
{
return connectionManager;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -65,7 +65,7 @@
void setForceNotSameRM(boolean force);
- ConnectionManager getConnectionManager();
+ FailoverManager getConnectionManager();
void workDone();
}
Deleted: trunk/src/main/org/hornetq/core/client/impl/ConnectionManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ConnectionManager.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/client/impl/ConnectionManager.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -1,63 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.RemotingConnection;
-
-/**
- * A ConnectionManager
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 27 Nov 2008 18:45:46
- *
- *
- */
-public interface ConnectionManager
-{
- ClientSession createSession(final String username,
- final String password,
- final boolean xa,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final int ackBatchSize,
- final boolean cacheLargeMessageClient,
- final int minLargeMessageSize,
- final boolean blockOnAcknowledge,
- final boolean autoGroup,
- final int producerWindowSize,
- final int consumerWindowSize,
- final int producerMaxRate,
- final int consumerMaxRate,
- final boolean blockOnNonPersistentSend,
- final boolean blockOnPersistentSend) throws HornetQException;
-
- void removeSession(final ClientSessionInternal session);
-
- int numConnections();
-
- int numSessions();
-
- RemotingConnection getConnection(final int initialRefCount);
-
- void addFailureListener(FailureListener listener);
-
- boolean removeFailureListener(FailureListener listener);
-
- void causeExit();
-}
Deleted: trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -1,1351 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.Interceptor;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.AbstractBufferHandler;
-import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.Ping;
-import org.hornetq.core.remoting.spi.Connection;
-import org.hornetq.core.remoting.spi.ConnectionLifeCycleListener;
-import org.hornetq.core.remoting.spi.Connector;
-import org.hornetq.core.remoting.spi.ConnectorFactory;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.version.Version;
-import org.hornetq.utils.ConcurrentHashSet;
-import org.hornetq.utils.ExecutorFactory;
-import org.hornetq.utils.OrderedExecutorFactory;
-import org.hornetq.utils.UUIDGenerator;
-import org.hornetq.utils.VersionLoader;
-
-/**
- * A ConnectionManagerImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 27 Nov 2008 18:46:06
- *
- */
-public class ConnectionManagerImpl implements ConnectionManager, ConnectionLifeCycleListener
-{
- // Constants
- // ------------------------------------------------------------------------------------
-
- private static final long serialVersionUID = 2512460695662741413L;
-
- private static final Logger log = Logger.getLogger(ConnectionManagerImpl.class);
-
- // Attributes
- // -----------------------------------------------------------------------------------
-
- //We hold this reference for GC reasons
- private final ClientSessionFactory sessionFactory;
-
- private final TransportConfiguration connectorConfig;
-
- private final TransportConfiguration backupConfig;
-
- private ConnectorFactory connectorFactory;
-
- private Map<String, Object> transportParams;
-
- private ConnectorFactory backupConnectorFactory;
-
- private Map<String, Object> backupTransportParams;
-
- private final int maxConnections;
-
- private final long callTimeout;
-
- private final long clientFailureCheckPeriod;
-
- private final long connectionTTL;
-
- private final Map<ClientSessionInternal, RemotingConnection> sessions = new HashMap<ClientSessionInternal, RemotingConnection>();
-
- private final Object exitLock = new Object();
-
- private final Object createSessionLock = new Object();
-
- private boolean inCreateSession;
-
- private final Object failoverLock = new Object();
-
- private final ExecutorFactory orderedExecutorFactory;
-
- private final ExecutorService threadPool;
-
- private final ScheduledExecutorService scheduledThreadPool;
-
- private final Map<Object, ConnectionEntry> connections = Collections.synchronizedMap(new LinkedHashMap<Object, ConnectionEntry>());
-
- private int refCount;
-
- private Iterator<ConnectionEntry> mapIterator;
-
- private final long retryInterval;
-
- private final double retryIntervalMultiplier; // For exponential backoff
-
- private final long maxRetryInterval;
-
- private final int reconnectAttempts;
-
- private boolean failoverOnServerShutdown;
-
- private Set<FailureListener> listeners = new ConcurrentHashSet<FailureListener>();
-
- private Connector connector;
-
- private Future<?> pingerFuture;
-
- private PingRunnable pingRunnable;
-
- private volatile boolean exitLoop;
-
- private final List<Interceptor> interceptors;
-
- private final boolean useReattach;
-
- // debug
-
- private static Map<TransportConfiguration, Set<RemotingConnection>> debugConns;
-
- private static boolean debug = false;
-
- public static void enableDebug()
- {
- debug = true;
-
- debugConns = new ConcurrentHashMap<TransportConfiguration, Set<RemotingConnection>>();
- }
-
- // Static
- // ---------------------------------------------------------------------------------------
-
- // Constructors
- // ---------------------------------------------------------------------------------
-
- public ConnectionManagerImpl(final ClientSessionFactory sessionFactory,
- final TransportConfiguration connectorConfig,
- final TransportConfiguration backupConfig,
- final boolean failoverOnServerShutdown,
- final int maxConnections,
- final long callTimeout,
- final long clientFailureCheckPeriod,
- final long connectionTTL,
- final long retryInterval,
- final double retryIntervalMultiplier,
- final long maxRetryInterval,
- final int reconnectAttempts,
- final boolean useReattach,
- final ExecutorService threadPool,
- final ScheduledExecutorService scheduledThreadPool,
- final List<Interceptor> interceptors)
- {
- this.sessionFactory = sessionFactory;
-
- this.connectorConfig = connectorConfig;
-
- this.backupConfig = backupConfig;
-
- this.failoverOnServerShutdown = failoverOnServerShutdown;
-
- connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
-
- transportParams = connectorConfig.getParams();
-
- if (backupConfig != null)
- {
- backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
-
- backupTransportParams = backupConfig.getParams();
- }
- else
- {
- backupConnectorFactory = null;
-
- backupTransportParams = null;
- }
-
- this.maxConnections = maxConnections;
-
- this.callTimeout = callTimeout;
-
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
-
- this.connectionTTL = connectionTTL;
-
- this.retryInterval = retryInterval;
-
- this.retryIntervalMultiplier = retryIntervalMultiplier;
-
- this.maxRetryInterval = maxRetryInterval;
-
- this.reconnectAttempts = reconnectAttempts;
-
- this.useReattach = useReattach;
-
- this.scheduledThreadPool = scheduledThreadPool;
-
- this.threadPool = threadPool;
-
- this.orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
-
- this.interceptors = interceptors;
- }
-
- // ConnectionLifeCycleListener implementation --------------------------------------------------
-
- public void connectionCreated(final Connection connection)
- {
- }
-
- public void connectionDestroyed(final Object connectionID)
- {
- failConnection(connectionID, new HornetQException(HornetQException.NOT_CONNECTED, "Channel disconnected"));
- }
-
- public void connectionException(final Object connectionID, final HornetQException me)
- {
- failConnection(connectionID, me);
- }
-
- // ConnectionManager implementation ------------------------------------------------------------------
-
- public ClientSession createSession(final String username,
- final String password,
- final boolean xa,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final int ackBatchSize,
- final boolean cacheLargeMessageClient,
- final int minLargeMessageSize,
- final boolean blockOnAcknowledge,
- final boolean autoGroup,
- final int producerWindowSize,
- final int consumerWindowSize,
- final int producerMaxRate,
- final int consumerMaxRate,
- final boolean blockOnNonPersistentSend,
- final boolean blockOnPersistentSend) throws HornetQException
- {
- synchronized (createSessionLock)
- {
- String name = UUIDGenerator.getInstance().generateSimpleStringUUID().toString();
-
- boolean retry = false;
- do
- {
- Version clientVersion = VersionLoader.getVersion();
-
- RemotingConnection connection = null;
-
- Lock lock = null;
-
- try
- {
- Channel channel1;
-
- synchronized (failoverLock)
- {
- connection = getConnectionWithRetry(1, reconnectAttempts);
-
- if (connection == null)
- {
- if (exitLoop)
- {
- return null;
- }
- // This can happen if the connection manager gets exitLoop - e.g. the server gets shut down
-
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Unable to connect to server using configuration " + connectorConfig);
-
- }
-
- channel1 = connection.getChannel(1, -1, false);
-
- // Lock it - this must be done while the failoverLock is held
- channel1.getLock().lock();
-
- lock = channel1.getLock();
- } // We can now release the failoverLock
-
- // We now set a flag saying createSession is executing
- synchronized (exitLock)
- {
- inCreateSession = true;
- }
-
- long sessionChannelID = connection.generateChannelID();
-
- Packet request = new CreateSessionMessage(name,
- sessionChannelID,
- clientVersion.getIncrementingVersion(),
- username,
- password,
- minLargeMessageSize,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- producerWindowSize);
-
- Packet pResponse;
- try
- {
- pResponse = channel1.sendBlocking(request);
- }
- catch (HornetQException e)
- {
- if (e.getCode() == HornetQException.UNBLOCKED)
- {
- // This means the thread was blocked on create session and failover unblocked it
- // so failover could occur
-
- // So we just need to return our connections and flag for retry
-
- returnConnection(connection.getID());
-
- retry = true;
-
- continue;
- }
- else
- {
- throw e;
- }
- }
-
- CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
-
- Channel sessionChannel = connection.getChannel(sessionChannelID,
- producerWindowSize,
- producerWindowSize != -1);
-
- ClientSessionInternal session = new ClientSessionImpl(this,
- name,
- username,
- password,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- blockOnAcknowledge,
- autoGroup,
- ackBatchSize,
- consumerWindowSize,
- consumerMaxRate,
- producerWindowSize,
- producerMaxRate,
- blockOnNonPersistentSend,
- blockOnPersistentSend,
- cacheLargeMessageClient,
- minLargeMessageSize,
- connection,
- response.getServerVersion(),
- sessionChannel,
- orderedExecutorFactory.getExecutor());
-
- sessions.put(session, connection);
-
- ChannelHandler handler = new ClientSessionPacketHandler(session, sessionChannel);
-
- sessionChannel.setHandler(handler);
-
- return new DelegatingSession(session);
- }
- catch (Throwable t)
- {
- if (lock != null)
- {
- lock.unlock();
-
- lock = null;
- }
-
- if (connection != null)
- {
- returnConnection(connection.getID());
- }
-
- if (t instanceof HornetQException)
- {
- throw (HornetQException)t;
- }
- else
- {
- HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR,
- "Failed to create session");
-
- me.initCause(t);
-
- throw me;
- }
- }
- finally
- {
- if (lock != null)
- {
- lock.unlock();
- }
-
- // Execution has finished so notify any failover thread that may be waiting for us to be done
- synchronized (exitLock)
- {
- inCreateSession = false;
-
- exitLock.notify();
- }
- }
- }
- while (retry);
- }
-
- // Should never get here
- throw new IllegalStateException("Oh my God it's full of stars!");
- }
-
- // Must be synchronized to prevent it happening concurrently with failover which can lead to
- // inconsistencies
- public void removeSession(final ClientSessionInternal session)
- {
- // TODO - can we simplify this locking?
- synchronized (createSessionLock)
- {
- synchronized (failoverLock)
- {
- sessions.remove(session);
-
- returnConnection(session.getConnection().getID());
- }
- }
- }
-
- public synchronized int numConnections()
- {
- return connections.size();
- }
-
- public int numSessions()
- {
- return sessions.size();
- }
-
- public void addFailureListener(FailureListener listener)
- {
- listeners.add(listener);
- }
-
- public boolean removeFailureListener(FailureListener listener)
- {
- return listeners.remove(listener);
- }
-
- public void causeExit()
- {
- exitLoop = true;
- }
-
- // Public
- // ---------------------------------------------------------------------------------------
-
- private volatile boolean stopPingingAfterOne;
-
- public void stopPingingAfterOne()
- {
- this.stopPingingAfterOne = true;
- }
-
- // Protected
- // ------------------------------------------------------------------------------------
-
- // Package Private
- // ------------------------------------------------------------------------------
-
- // Private
- // --------------------------------------------------------------------------------------
-
- private void handleConnectionFailure(final HornetQException me, final Object connectionID)
- {
- failoverOrReconnect(me, connectionID);
- }
-
- private void failoverOrReconnect(final HornetQException me, final Object connectionID)
- {
- boolean done = false;
-
- synchronized (failoverLock)
- {
- if (connectionID != null && !connections.containsKey(connectionID))
- {
- // We already failed over/reconnected - probably the first failure came in, all the connections were failed
- // over then a async connection exception or disconnect
- // came in for one of the already exitLoop connections, so we return true - we don't want to call the
- // listeners again
-
- return;
- }
-
- // Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
- // There are either no threads executing in createSession, or one is blocking on a createSession
- // result.
-
- // Then interrupt the channel 1 that is blocking (could just interrupt them all)
-
- // Then release all channel 1 locks - this allows the createSession to exit the monitor
-
- // Then get all channel 1 locks again - this ensures the any createSession thread has executed the section and
- // returned all its connections to the connection manager (the code to return connections to connection manager
- // must be inside the lock
-
- // Then perform failover
-
- // Then release failoverLock
-
- // The other side of the bargain - during createSession:
- // The calling thread must get the failoverLock and get its' connections when this is locked.
- // While this is still locked it must then get the channel1 lock
- // It can then release the failoverLock
- // It should catch HornetQException.INTERRUPTED in the call to channel.sendBlocking
- // It should then return its connections, with channel 1 lock still held
- // It can then release the channel 1 lock, and retry (which will cause locking on failoverLock
- // until failover is complete
-
- boolean serverShutdown = me.getCode() == HornetQException.DISCONNECTED;
-
- //We will try to failover if there is a backup connector factory, but we don't do this if the server
- //has been shutdown cleanly unless failoverOnServerShutdown is true
- boolean attemptFailover = backupConnectorFactory != null && (failoverOnServerShutdown || !serverShutdown);
-
- boolean attemptReconnect;
-
- if (attemptFailover)
- {
- attemptReconnect = false;
- }
- else
- {
- attemptReconnect = reconnectAttempts != 0;
- }
-
- if (attemptFailover || attemptReconnect)
- {
- lockAllChannel1s();
-
- final boolean needToInterrupt;
-
- synchronized (exitLock)
- {
- needToInterrupt = inCreateSession;
- }
-
- unlockAllChannel1s();
-
- if (needToInterrupt)
- {
- // Forcing return all channels won't guarantee that any blocked thread will return immediately
- // So we need to wait for it
- forceReturnAllChannel1s();
-
- // Now we need to make sure that the thread has actually exited and returned it's connections
- // before failover occurs
-
- synchronized (exitLock)
- {
- while (inCreateSession)
- {
- try
- {
- exitLock.wait(5000);
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- }
-
- // Now we absolutely know that no threads are executing in or blocked in createSession, and no
- // more will execute it until failover is complete
-
- // So.. do failover / reconnection
-
- Set<RemotingConnection> oldConnections = new HashSet<RemotingConnection>();
-
- for (ConnectionEntry entry : connections.values())
- {
- oldConnections.add(entry.connection);
- }
-
- connections.clear();
-
- refCount = 0;
-
- mapIterator = null;
-
- try
- {
- connector.close();
- }
- catch (Exception ignore)
- {
- }
-
- connector = null;
-
- if (attemptFailover)
- {
- // Now try failing over to backup
-
- connectorFactory = backupConnectorFactory;
-
- transportParams = backupTransportParams;
-
- backupConnectorFactory = null;
-
- backupTransportParams = null;
-
- done = reattachSessions(reconnectAttempts == -1 ? -1 : reconnectAttempts + 1, false);
- }
- else
- {
- done = reattachSessions(reconnectAttempts, useReattach);
- }
-
- if (done)
- {
- // Destroy the old connections
- for (RemotingConnection connection : oldConnections)
- {
- connection.destroy();
- }
- }
- else
- {
- for (RemotingConnection connection : oldConnections)
- {
- connection.destroy();
- }
-
- closeAllConnections();
- }
- }
- else
- {
- closeAllConnections();
- }
-
- //We always call the failure listeners
- callFailureListeners(me);
- }
- }
-
- private void closeAllConnections()
- {
- refCount = 0;
- mapIterator = null;
- checkCloseConnections();
- }
-
- private void callFailureListeners(final HornetQException me)
- {
- final List<FailureListener> listenersClone = new ArrayList<FailureListener>(listeners);
-
- for (final FailureListener listener : listenersClone)
- {
- try
- {
- listener.connectionFailed(me);
- }
- catch (final Throwable t)
- {
- // Failure of one listener to execute shouldn't prevent others
- // from
- // executing
- log.error("Failed to execute failure listener", t);
- }
- }
- }
-
- /*
- * Re-attach sessions all pre-existing sessions to new remoting connections
- */
- private boolean reattachSessions(final int reconnectAttempts, final boolean reattach)
- {
- // We re-attach sessions per connection to ensure there is the same mapping of channel id
- // on live and backup connections
-
- Map<RemotingConnection, List<ClientSessionInternal>> sessionsPerConnection = new HashMap<RemotingConnection, List<ClientSessionInternal>>();
-
- for (Map.Entry<ClientSessionInternal, RemotingConnection> entry : sessions.entrySet())
- {
- ClientSessionInternal session = entry.getKey();
-
- RemotingConnection connection = entry.getValue();
-
- List<ClientSessionInternal> sessions = sessionsPerConnection.get(connection);
-
- if (sessions == null)
- {
- sessions = new ArrayList<ClientSessionInternal>();
-
- sessionsPerConnection.put(connection, sessions);
- }
-
- sessions.add(session);
- }
-
- boolean ok = true;
-
- for (Map.Entry<RemotingConnection, List<ClientSessionInternal>> entry : sessionsPerConnection.entrySet())
- {
- List<ClientSessionInternal> theSessions = entry.getValue();
-
- RemotingConnection backupConnection = getConnectionWithRetry(theSessions.size(), reconnectAttempts);
-
- if (backupConnection == null)
- {
- log.warn("Failed to connect to server.");
-
- ok = false;
-
- break;
- }
-
- List<FailureListener> oldListeners = entry.getKey().getFailureListeners();
-
- List<FailureListener> newListeners = new ArrayList<FailureListener>(backupConnection.getFailureListeners());
-
- for (FailureListener listener : oldListeners)
- {
- // Add all apart from the first one which is the old DelegatingFailureListener
-
- if (listener instanceof DelegatingFailureListener == false)
- {
- newListeners.add(listener);
- }
- }
-
- backupConnection.setFailureListeners(newListeners);
-
- for (ClientSessionInternal session : theSessions)
- {
- sessions.put(session, backupConnection);
- }
- }
-
- if (ok)
- {
- // If all connections got ok, then handle failover
- for (Map.Entry<ClientSessionInternal, RemotingConnection> entry : sessions.entrySet())
- {
- boolean b;
-
- if (reattach)
- {
- b = entry.getKey().handleReattach(entry.getValue());
- }
- else
- {
- b = entry.getKey().handleFailover(entry.getValue());
- }
-
- if (!b)
- {
- // If a session fails to re-attach we doom the lot, but we make sure we try all sessions and don't exit
- // early
- // or connections might be left lying around
- ok = false;
- }
- }
-
- log.info("Reconnected ok");
- }
-
- return ok;
- }
-
- private RemotingConnection getConnectionWithRetry(final int initialRefCount, final int reconnectAttempts)
- {
- long interval = retryInterval;
-
- int count = 0;
-
- while (true)
- {
- if (exitLoop)
- {
- return null;
- }
-
- RemotingConnection connection = getConnection(initialRefCount);
-
- if (connection == null)
- {
- // Failed to get connection
-
- if (reconnectAttempts != 0)
- {
- count++;
-
- if (reconnectAttempts != -1 && count == reconnectAttempts)
- {
- log.warn("Tried " + reconnectAttempts + " times to connect. Now giving up.");
-
- return null;
- }
-
- try
- {
- log.info("sleeping " + interval);
-
- Thread.sleep(interval);
- }
- catch (InterruptedException ignore)
- {
- }
-
- // Exponential back-off
- long newInterval = (long)((double)interval * retryIntervalMultiplier);
-
- if (newInterval > maxRetryInterval)
- {
- newInterval = maxRetryInterval;
- }
-
- interval = newInterval;
- }
- else
- {
- return null;
- }
- }
- else
- {
- return connection;
- }
- }
- }
-
- private void checkCloseConnections()
- {
- if (refCount == 0)
- {
- // Close connections
-
- Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
-
- if (pingerFuture != null)
- {
- pingRunnable.cancel();
-
- boolean ok = pingerFuture.cancel(false);
-
- pingRunnable = null;
-
- pingerFuture = null;
- }
-
- connections.clear();
-
- for (ConnectionEntry entry : copy)
- {
- try
- {
- entry.connection.destroy();
- }
- catch (Throwable ignore)
- {
- }
- }
-
- mapIterator = null;
-
- try
- {
- if (connector != null)
- {
- connector.close();
- }
- }
- catch (Throwable ignore)
- {
- }
-
- connector = null;
- }
-
- }
-
- public RemotingConnection getConnection(final int initialRefCount)
- {
- RemotingConnection conn;
-
- if (connections.size() < maxConnections)
- {
- // Create a new one
-
- Connection tc = null;
-
- try
- {
- if (connector == null)
- {
- DelegatingBufferHandler handler = new DelegatingBufferHandler();
-
- connector = connectorFactory.createConnector(transportParams,
- handler,
- this,
- threadPool,
- scheduledThreadPool);
-
- if (connector != null)
- {
- connector.start();
- }
- }
-
- if (connector != null)
- {
- tc = connector.createConnection();
-
- if (tc == null)
- {
- try
- {
- connector.close();
- }
- catch (Throwable t)
- {
- }
-
- connector = null;
- }
- }
- }
- catch (Exception e)
- {
- // Sanity catch for badly behaved remoting plugins
-
- log.warn("connector.create or connectorFactory.createConnector should never throw an exception, implementation is badly behaved, but we'll deal with it anyway.",
- e);
-
- if (tc != null)
- {
- try
- {
- tc.close();
- }
- catch (Throwable t)
- {
- }
- }
-
- if (connector != null)
- {
- try
- {
- connector.close();
- }
- catch (Throwable t)
- {
- }
- }
-
- tc = null;
-
- connector = null;
- }
-
- if (tc == null)
- {
- return null;
- }
-
- conn = new RemotingConnectionImpl(tc, callTimeout, interceptors);
-
- conn.addFailureListener(new DelegatingFailureListener(conn.getID()));
-
- conn.getChannel(0, -1, false).setHandler(new Channel0Handler(conn));
-
- connections.put(conn.getID(), new ConnectionEntry(conn,
- connector,
- clientFailureCheckPeriod,
- System.currentTimeMillis()));
-
- if (clientFailureCheckPeriod != -1)
- {
- if (pingerFuture == null)
- {
- pingRunnable = new PingRunnable();
-
- pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new ActualScheduled(pingRunnable),
- 0,
- clientFailureCheckPeriod,
- TimeUnit.MILLISECONDS);
- }
- // send a ping every time we create a new remoting connection
- // to set up its TTL on the server side
- else
- {
- pingRunnable.run();
- }
- }
-
- if (debug)
- {
- checkAddDebug(conn);
- }
- }
- else
- {
- // Return one round-robin from the list
-
- if (mapIterator == null || !mapIterator.hasNext())
- {
- mapIterator = connections.values().iterator();
- }
-
- ConnectionEntry entry = mapIterator.next();
-
- conn = entry.connection;
- }
-
- refCount += initialRefCount;
-
- return conn;
- }
-
- private void returnConnection(final Object connectionID)
- {
- ConnectionEntry entry = connections.get(connectionID);
-
- if (refCount != 0)
- {
- refCount--;
- }
-
- if (entry != null)
- {
- checkCloseConnections();
- }
- else
- {
- // Can be legitimately null if session was exitLoop before then went to remove session from csf
- // and locked since failover had started then after failover removes it but it's already been failed
- }
- }
-
- private ConnectorFactory instantiateConnectorFactory(final String connectorFactoryClassName)
- {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
- {
- Class<?> clazz = loader.loadClass(connectorFactoryClassName);
- return (ConnectorFactory)clazz.newInstance();
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Error instantiating connector factory \"" + connectorFactoryClassName +
- "\"", e);
- }
- }
-
- private void lockAllChannel1s()
- {
- for (ConnectionEntry entry : connections.values())
- {
- Channel channel1 = entry.connection.getChannel(1, -1, false);
-
- channel1.getLock().lock();
- }
- }
-
- private void unlockAllChannel1s()
- {
- for (ConnectionEntry entry : connections.values())
- {
- Channel channel1 = entry.connection.getChannel(1, -1, false);
-
- channel1.getLock().unlock();
- }
- }
-
- private void forceReturnAllChannel1s()
- {
- for (ConnectionEntry entry : connections.values())
- {
- Channel channel1 = entry.connection.getChannel(1, -1, false);
-
- channel1.returnBlocking();
- }
- }
-
- private void failConnection(final Object connectionID, final HornetQException me)
- {
- ConnectionEntry entry = connections.get(connectionID);
-
- if (entry != null)
- {
- RemotingConnection conn = entry.connection;
-
- conn.fail(me);
- }
- }
-
- private class Channel0Handler implements ChannelHandler
- {
- private final RemotingConnection conn;
-
- private Channel0Handler(final RemotingConnection conn)
- {
- this.conn = conn;
- }
-
- public void handlePacket(final Packet packet)
- {
- final byte type = packet.getType();
-
- if (type == PacketImpl.DISCONNECT)
- {
- threadPool.execute(new Runnable()
- {
- // Must be executed on new thread since cannot block the netty thread for a long time and fail can
- // cause reconnect loop
- public void run()
- {
- conn.fail(new HornetQException(HornetQException.DISCONNECTED,
- "The connection was disconnected because of server shutdown"));
- }
- });
- }
- }
- }
-
- private static class ConnectionEntry
- {
- ConnectionEntry(final RemotingConnection connection,
- final Connector connector,
- final long expiryPeriod,
- final long createTime)
- {
- this.connection = connection;
-
- this.connector = connector;
-
- this.expiryPeriod = expiryPeriod;
-
- this.lastCheck = createTime;
- }
-
- final RemotingConnection connection;
-
- final Connector connector;
-
- volatile long lastCheck;
-
- final long expiryPeriod;
- }
-
- private class DelegatingBufferHandler extends AbstractBufferHandler
- {
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
- {
- ConnectionEntry entry = connections.get(connectionID);
-
- if (entry != null)
- {
- entry.connection.bufferReceived(connectionID, buffer);
- }
- }
- }
-
- private class DelegatingFailureListener implements FailureListener
- {
- final Object connectionID;
-
- DelegatingFailureListener(final Object connectionID)
- {
- this.connectionID = connectionID;
- }
-
- public void connectionFailed(final HornetQException me)
- {
- handleConnectionFailure(me, connectionID);
- }
- }
-
- // Debug only
-
- private void checkAddDebug(final RemotingConnection conn)
- {
- Set<RemotingConnection> conns;
-
- synchronized (debugConns)
- {
- conns = debugConns.get(connectorConfig);
-
- if (conns == null)
- {
- conns = new HashSet<RemotingConnection>();
-
- debugConns.put(connectorConfig, conns);
- }
-
- conns.add(conn);
- }
- }
-
- public static void failAllConnectionsForConnector(final TransportConfiguration config)
- {
- Set<RemotingConnection> conns;
-
- synchronized (debugConns)
- {
- conns = debugConns.get(config);
-
- if (conns != null)
- {
- conns = new HashSet<RemotingConnection>(debugConns.get(config));
- }
- }
-
- if (conns != null)
- {
- for (RemotingConnection conn : conns)
- {
- conn.fail(new HornetQException(HornetQException.INTERNAL_ERROR, "blah"));
- }
- }
- }
-
- private static final class ActualScheduled implements Runnable
- {
- private final WeakReference<PingRunnable> pingRunnable;
-
- ActualScheduled(final PingRunnable runnable)
- {
- this.pingRunnable = new WeakReference<PingRunnable>(runnable);
- }
-
- public void run()
- {
- PingRunnable runnable = pingRunnable.get();
-
- if (runnable != null)
- {
- runnable.run();
- }
- }
-
- }
-
- private final class PingRunnable implements Runnable
- {
- private boolean cancelled;
-
- private boolean first;
-
- public synchronized void run()
- {
- if (cancelled || (stopPingingAfterOne && !first))
- {
- return;
- }
-
- first = false;
-
- synchronized (connections)
- {
- long now = System.currentTimeMillis();
-
- for (ConnectionEntry entry : connections.values())
- {
- final RemotingConnection connection = entry.connection;
-
- if (entry.expiryPeriod != -1 && now >= entry.lastCheck + entry.expiryPeriod)
- {
- if (!connection.checkDataReceived())
- {
- final HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive data from server for " + connection.getTransportConnection());
-
- threadPool.execute(new Runnable()
- {
- // Must be executed on different thread
- public void run()
- {
- connection.fail(me);
- }
- });
-
- return;
- }
- else
- {
- entry.lastCheck = now;
- }
- }
-
- // Send a ping
-
- Ping ping = new Ping(connectionTTL);
-
- Channel channel0 = connection.getChannel(0, -1, false);
-
- channel0.send(ping);
- }
- }
- }
-
- public synchronized void cancel()
- {
- cancelled = true;
- }
- }
-
-}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -498,7 +498,7 @@
session.stop();
}
- public ConnectionManager getConnectionManager()
+ public FailoverManager getConnectionManager()
{
return session.getConnectionManager();
}
Copied: trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java (from rev 8035, trunk/src/main/org/hornetq/core/client/impl/ConnectionManager.java)
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java (rev 0)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.RemotingConnection;
+
+/**
+ * A ConnectionManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 27 Nov 2008 18:45:46
+ *
+ *
+ */
+public interface FailoverManager
+{
+ ClientSession createSession(final String username,
+ final String password,
+ final boolean xa,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preAcknowledge,
+ final int ackBatchSize,
+ final boolean cacheLargeMessageClient,
+ final int minLargeMessageSize,
+ final boolean blockOnAcknowledge,
+ final boolean autoGroup,
+ final int producerWindowSize,
+ final int consumerWindowSize,
+ final int producerMaxRate,
+ final int consumerMaxRate,
+ final boolean blockOnNonPersistentSend,
+ final boolean blockOnPersistentSend) throws HornetQException;
+
+ void removeSession(final ClientSessionInternal session);
+
+ int numConnections();
+
+ int numSessions();
+
+ void addFailureListener(FailureListener listener);
+
+ boolean removeFailureListener(FailureListener listener);
+
+ void causeExit();
+}
Copied: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java (from rev 8035, trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java)
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java (rev 0)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -0,0 +1,1097 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.AbstractBufferHandler;
+import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.spi.Connection;
+import org.hornetq.core.remoting.spi.ConnectionLifeCycleListener;
+import org.hornetq.core.remoting.spi.Connector;
+import org.hornetq.core.remoting.spi.ConnectorFactory;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.version.Version;
+import org.hornetq.utils.ConcurrentHashSet;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.UUIDGenerator;
+import org.hornetq.utils.VersionLoader;
+
+/**
+ * A ConnectionManagerImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 27 Nov 2008 18:46:06
+ *
+ */
+public class FailoverManagerImpl implements FailoverManager, ConnectionLifeCycleListener
+{
+ // Constants
+ // ------------------------------------------------------------------------------------
+
+ private static final long serialVersionUID = 2512460695662741413L;
+
+ private static final Logger log = Logger.getLogger(FailoverManagerImpl.class);
+
+ // Attributes
+ // -----------------------------------------------------------------------------------
+
+ // We hold this reference for GC reasons
+ private final ClientSessionFactory sessionFactory;
+
+ private final TransportConfiguration connectorConfig;
+
+ private final TransportConfiguration backupConfig;
+
+ private ConnectorFactory connectorFactory;
+
+ private Map<String, Object> transportParams;
+
+ private ConnectorFactory backupConnectorFactory;
+
+ private Map<String, Object> backupTransportParams;
+
+ private final long callTimeout;
+
+ private final long clientFailureCheckPeriod;
+
+ private final long connectionTTL;
+
+ private final Set<ClientSessionInternal> sessions = new HashSet<ClientSessionInternal>();
+
+ private final Object exitLock = new Object();
+
+ private final Object createSessionLock = new Object();
+
+ private boolean inCreateSession;
+
+ private final Object failoverLock = new Object();
+
+ private final ExecutorFactory orderedExecutorFactory;
+
+ private final ExecutorService threadPool;
+
+ private final ScheduledExecutorService scheduledThreadPool;
+
+ private RemotingConnection connection;
+
+ private final long retryInterval;
+
+ private final double retryIntervalMultiplier; // For exponential backoff
+
+ private final long maxRetryInterval;
+
+ private final int reconnectAttempts;
+
+ private boolean failoverOnServerShutdown;
+
+ private Set<FailureListener> listeners = new ConcurrentHashSet<FailureListener>();
+
+ private Connector connector;
+
+ private Future<?> pingerFuture;
+
+ private PingRunnable pingRunnable;
+
+ private volatile boolean exitLoop;
+
+ private final List<Interceptor> interceptors;
+
+ private final boolean useReattach;
+
+ // Static
+ // ---------------------------------------------------------------------------------------
+
+ // Constructors
+ // ---------------------------------------------------------------------------------
+
+ public FailoverManagerImpl(final ClientSessionFactory sessionFactory,
+ final TransportConfiguration connectorConfig,
+ final TransportConfiguration backupConfig,
+ final boolean failoverOnServerShutdown,
+ final long callTimeout,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
+ final boolean useReattach,
+ final ExecutorService threadPool,
+ final ScheduledExecutorService scheduledThreadPool,
+ final List<Interceptor> interceptors)
+ {
+ this.sessionFactory = sessionFactory;
+
+ this.connectorConfig = connectorConfig;
+
+ this.backupConfig = backupConfig;
+
+ this.failoverOnServerShutdown = failoverOnServerShutdown;
+
+ connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
+
+ transportParams = connectorConfig.getParams();
+
+ if (backupConfig != null)
+ {
+ backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
+
+ backupTransportParams = backupConfig.getParams();
+ }
+ else
+ {
+ backupConnectorFactory = null;
+
+ backupTransportParams = null;
+ }
+
+ this.callTimeout = callTimeout;
+
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+
+ this.connectionTTL = connectionTTL;
+
+ this.retryInterval = retryInterval;
+
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+ this.maxRetryInterval = maxRetryInterval;
+
+ this.reconnectAttempts = reconnectAttempts;
+
+ this.useReattach = useReattach;
+
+ this.scheduledThreadPool = scheduledThreadPool;
+
+ this.threadPool = threadPool;
+
+ this.orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
+
+ this.interceptors = interceptors;
+ }
+
+ // ConnectionLifeCycleListener implementation --------------------------------------------------
+
+ public void connectionCreated(final Connection connection)
+ {
+ }
+
+ public void connectionDestroyed(final Object connectionID)
+ {
+ this.handleConnectionFailure(connectionID, new HornetQException(HornetQException.NOT_CONNECTED,
+ "Channel disconnected"));
+ }
+
+ public void connectionException(final Object connectionID, final HornetQException me)
+ {
+ this.handleConnectionFailure(connectionID, me);
+ }
+
+ // ConnectionManager implementation ------------------------------------------------------------------
+
+ public ClientSession createSession(final String username,
+ final String password,
+ final boolean xa,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preAcknowledge,
+ final int ackBatchSize,
+ final boolean cacheLargeMessageClient,
+ final int minLargeMessageSize,
+ final boolean blockOnAcknowledge,
+ final boolean autoGroup,
+ final int producerWindowSize,
+ final int consumerWindowSize,
+ final int producerMaxRate,
+ final int consumerMaxRate,
+ final boolean blockOnNonPersistentSend,
+ final boolean blockOnPersistentSend) throws HornetQException
+ {
+ synchronized (createSessionLock)
+ {
+ String name = UUIDGenerator.getInstance().generateSimpleStringUUID().toString();
+
+ boolean retry = false;
+ do
+ {
+ Version clientVersion = VersionLoader.getVersion();
+
+ RemotingConnection connection = null;
+
+ Lock lock = null;
+
+ try
+ {
+ Channel channel1;
+
+ synchronized (failoverLock)
+ {
+ connection = getConnectionWithRetry(reconnectAttempts);
+
+ if (connection == null)
+ {
+ if (exitLoop)
+ {
+ return null;
+ }
+ // This can happen if the connection manager gets exitLoop - e.g. the server gets shut down
+
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Unable to connect to server using configuration " + connectorConfig);
+
+ }
+
+ channel1 = connection.getChannel(1, -1, false);
+
+ // Lock it - this must be done while the failoverLock is held
+ channel1.getLock().lock();
+
+ lock = channel1.getLock();
+ } // We can now release the failoverLock
+
+ // We now set a flag saying createSession is executing
+ synchronized (exitLock)
+ {
+ inCreateSession = true;
+ }
+
+ long sessionChannelID = connection.generateChannelID();
+
+ Packet request = new CreateSessionMessage(name,
+ sessionChannelID,
+ clientVersion.getIncrementingVersion(),
+ username,
+ password,
+ minLargeMessageSize,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ producerWindowSize);
+
+ Packet pResponse;
+ try
+ {
+ pResponse = channel1.sendBlocking(request);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.UNBLOCKED)
+ {
+ // This means the thread was blocked on create session and failover unblocked it
+ // so failover could occur
+
+ // So we just need to return our connections and flag for retry
+
+ checkCloseConnection();
+
+ retry = true;
+
+ continue;
+ }
+ else
+ {
+ throw e;
+ }
+ }
+
+ CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
+
+ Channel sessionChannel = connection.getChannel(sessionChannelID,
+ producerWindowSize,
+ producerWindowSize != -1);
+
+ ClientSessionInternal session = new ClientSessionImpl(this,
+ name,
+ username,
+ password,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ blockOnAcknowledge,
+ autoGroup,
+ ackBatchSize,
+ consumerWindowSize,
+ consumerMaxRate,
+ producerWindowSize,
+ producerMaxRate,
+ blockOnNonPersistentSend,
+ blockOnPersistentSend,
+ cacheLargeMessageClient,
+ minLargeMessageSize,
+ connection,
+ response.getServerVersion(),
+ sessionChannel,
+ orderedExecutorFactory.getExecutor());
+
+ sessions.add(session);
+
+ ChannelHandler handler = new ClientSessionPacketHandler(session, sessionChannel);
+
+ sessionChannel.setHandler(handler);
+
+ return new DelegatingSession(session);
+ }
+ catch (Throwable t)
+ {
+ if (lock != null)
+ {
+ lock.unlock();
+
+ lock = null;
+ }
+
+ checkCloseConnection();
+
+ if (t instanceof HornetQException)
+ {
+ throw (HornetQException)t;
+ }
+ else
+ {
+ HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR,
+ "Failed to create session");
+
+ me.initCause(t);
+
+ throw me;
+ }
+ }
+ finally
+ {
+ if (lock != null)
+ {
+ lock.unlock();
+ }
+
+ // Execution has finished so notify any failover thread that may be waiting for us to be done
+ synchronized (exitLock)
+ {
+ inCreateSession = false;
+
+ exitLock.notify();
+ }
+ }
+ }
+ while (retry);
+ }
+
+ // Should never get here
+ throw new IllegalStateException("Oh my God it's full of stars!");
+ }
+
+ // Must be synchronized to prevent it happening concurrently with failover which can lead to
+ // inconsistencies
+ public void removeSession(final ClientSessionInternal session)
+ {
+ // TODO - can we simplify this locking?
+ synchronized (createSessionLock)
+ {
+ synchronized (failoverLock)
+ {
+ sessions.remove(session);
+
+ checkCloseConnection();
+ }
+ }
+ }
+
+ public synchronized int numConnections()
+ {
+ return connection != null ? 1 : 0;
+ }
+
+ public int numSessions()
+ {
+ return sessions.size();
+ }
+
+ public void addFailureListener(FailureListener listener)
+ {
+ listeners.add(listener);
+ }
+
+ public boolean removeFailureListener(FailureListener listener)
+ {
+ return listeners.remove(listener);
+ }
+
+ public void causeExit()
+ {
+ exitLoop = true;
+ }
+
+ // Public
+ // ---------------------------------------------------------------------------------------
+
+ private volatile boolean stopPingingAfterOne;
+
+ public void stopPingingAfterOne()
+ {
+ this.stopPingingAfterOne = true;
+ }
+
+ // Protected
+ // ------------------------------------------------------------------------------------
+
+ // Package Private
+ // ------------------------------------------------------------------------------
+
+ // Private
+ // --------------------------------------------------------------------------------------
+
+ private void handleConnectionFailure(final Object connectionID, final HornetQException me)
+ {
+ failoverOrReconnect(connectionID, me);
+ }
+
+ private void failoverOrReconnect(final Object connectionID, final HornetQException me)
+ {
+ boolean done = false;
+
+ synchronized (failoverLock)
+ {
+ if (connection == null || connection.getID() != connectionID)
+ {
+ // We already failed over/reconnected - probably the first failure came in, all the connections were failed
+ // over then a async connection exception or disconnect
+ // came in for one of the already exitLoop connections, so we return true - we don't want to call the
+ // listeners again
+ return;
+ }
+
+ // Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
+ // There are either no threads executing in createSession, or one is blocking on a createSession
+ // result.
+
+ // Then interrupt the channel 1 that is blocking (could just interrupt them all)
+
+ // Then release all channel 1 locks - this allows the createSession to exit the monitor
+
+ // Then get all channel 1 locks again - this ensures the any createSession thread has executed the section and
+ // returned all its connections to the connection manager (the code to return connections to connection manager
+ // must be inside the lock
+
+ // Then perform failover
+
+ // Then release failoverLock
+
+ // The other side of the bargain - during createSession:
+ // The calling thread must get the failoverLock and get its' connections when this is locked.
+ // While this is still locked it must then get the channel1 lock
+ // It can then release the failoverLock
+ // It should catch HornetQException.INTERRUPTED in the call to channel.sendBlocking
+ // It should then return its connections, with channel 1 lock still held
+ // It can then release the channel 1 lock, and retry (which will cause locking on failoverLock
+ // until failover is complete
+
+ boolean serverShutdown = me.getCode() == HornetQException.DISCONNECTED;
+
+ // We will try to failover if there is a backup connector factory, but we don't do this if the server
+ // has been shutdown cleanly unless failoverOnServerShutdown is true
+ boolean attemptFailover = backupConnectorFactory != null && (failoverOnServerShutdown || !serverShutdown);
+
+ boolean attemptReconnect;
+
+ if (attemptFailover)
+ {
+ attemptReconnect = false;
+ }
+ else
+ {
+ attemptReconnect = reconnectAttempts != 0;
+ }
+
+ if (attemptFailover || attemptReconnect)
+ {
+ lockChannel1();
+
+ final boolean needToInterrupt;
+
+ synchronized (exitLock)
+ {
+ needToInterrupt = inCreateSession;
+ }
+
+ unlockChannel1();
+
+ if (needToInterrupt)
+ {
+ // Forcing return all channels won't guarantee that any blocked thread will return immediately
+ // So we need to wait for it
+ forceReturnChannel1();
+
+ // Now we need to make sure that the thread has actually exited and returned it's connections
+ // before failover occurs
+
+ synchronized (exitLock)
+ {
+ while (inCreateSession)
+ {
+ try
+ {
+ exitLock.wait(5000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ }
+
+ // Now we absolutely know that no threads are executing in or blocked in createSession, and no
+ // more will execute it until failover is complete
+
+ // So.. do failover / reconnection
+
+ RemotingConnection oldConnection = connection;
+
+ connection = null;
+
+ try
+ {
+ connector.close();
+ }
+ catch (Exception ignore)
+ {
+ }
+
+ connector = null;
+
+ if (attemptFailover)
+ {
+ // Now try failing over to backup
+
+ connectorFactory = backupConnectorFactory;
+
+ transportParams = backupTransportParams;
+
+ backupConnectorFactory = null;
+
+ backupTransportParams = null;
+
+ done = reattachSessions(reconnectAttempts == -1 ? -1 : reconnectAttempts + 1, false);
+ }
+ else
+ {
+ done = reattachSessions(reconnectAttempts, useReattach);
+ }
+
+ if (done)
+ {
+ // Destroy the old connection
+
+ oldConnection.destroy();
+ }
+ else
+ {
+ oldConnection.destroy();
+ }
+ }
+
+ // We always call the failure listeners
+ callFailureListeners(me);
+ }
+ }
+
+ private void callFailureListeners(final HornetQException me)
+ {
+ final List<FailureListener> listenersClone = new ArrayList<FailureListener>(listeners);
+
+ for (final FailureListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionFailed(me);
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
+ }
+
+ /*
+ * Re-attach sessions all pre-existing sessions to the new remoting connection
+ */
+ private boolean reattachSessions(final int reconnectAttempts, final boolean reattach)
+ {
+ RemotingConnection backupConnection = getConnectionWithRetry(reconnectAttempts);
+
+ if (backupConnection == null)
+ {
+ log.warn("Failed to connect to server.");
+
+ return false;
+ }
+
+ List<FailureListener> oldListeners = connection.getFailureListeners();
+
+ List<FailureListener> newListeners = new ArrayList<FailureListener>(backupConnection.getFailureListeners());
+
+ for (FailureListener listener : oldListeners)
+ {
+ // Add all apart from the first one which is the old DelegatingFailureListener
+
+ if (listener instanceof DelegatingFailureListener == false)
+ {
+ newListeners.add(listener);
+ }
+ }
+
+ backupConnection.setFailureListeners(newListeners);
+
+ boolean ok = true;
+
+ // If all connections got ok, then handle failover
+ for (ClientSessionInternal session : sessions)
+ {
+ boolean b;
+
+ if (reattach)
+ {
+ b = session.handleReattach(backupConnection);
+ }
+ else
+ {
+ b = session.handleFailover(backupConnection);
+ }
+
+ if (!b)
+ {
+ // If a session fails to re-attach we doom the lot, but we make sure we try all sessions and don't exit
+ // early
+ // or connections might be left lying around
+ ok = false;
+ }
+ }
+
+ return ok;
+ }
+
+ private RemotingConnection getConnectionWithRetry(final int reconnectAttempts)
+ {
+ long interval = retryInterval;
+
+ int count = 0;
+
+ while (true)
+ {
+ if (exitLoop)
+ {
+ return null;
+ }
+
+ RemotingConnection connection = getConnection();
+
+ if (connection == null)
+ {
+ // Failed to get connection
+
+ if (reconnectAttempts != 0)
+ {
+ count++;
+
+ if (reconnectAttempts != -1 && count == reconnectAttempts)
+ {
+ log.warn("Tried " + reconnectAttempts + " times to connect. Now giving up.");
+
+ return null;
+ }
+
+ try
+ {
+ Thread.sleep(interval);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ // Exponential back-off
+ long newInterval = (long)((double)interval * retryIntervalMultiplier);
+
+ if (newInterval > maxRetryInterval)
+ {
+ newInterval = maxRetryInterval;
+ }
+
+ interval = newInterval;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ else
+ {
+ return connection;
+ }
+ }
+ }
+
+ private void checkCloseConnection()
+ {
+ if (connection != null && sessions.size() == 0)
+ {
+ if (pingerFuture != null)
+ {
+ pingRunnable.cancel();
+
+ boolean ok = pingerFuture.cancel(false);
+
+ pingRunnable = null;
+
+ pingerFuture = null;
+ }
+
+ try
+ {
+ connection.destroy();
+ }
+ catch (Throwable ignore)
+ {
+ }
+
+ connection = null;
+
+ try
+ {
+ if (connector != null)
+ {
+ connector.close();
+ }
+ }
+ catch (Throwable ignore)
+ {
+ }
+
+ connector = null;
+ }
+ }
+
+ public RemotingConnection getConnection()
+ {
+ if (connection == null)
+ {
+ Connection tc = null;
+
+ try
+ {
+ if (connector == null)
+ {
+ DelegatingBufferHandler handler = new DelegatingBufferHandler();
+
+ connector = connectorFactory.createConnector(transportParams,
+ handler,
+ this,
+ threadPool,
+ scheduledThreadPool);
+
+ if (connector != null)
+ {
+ connector.start();
+ }
+ }
+
+ if (connector != null)
+ {
+ tc = connector.createConnection();
+
+ if (tc == null)
+ {
+ try
+ {
+ connector.close();
+ }
+ catch (Throwable t)
+ {
+ }
+
+ connector = null;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ // Sanity catch for badly behaved remoting plugins
+
+ log.warn("connector.create or connectorFactory.createConnector should never throw an exception, implementation is badly behaved, but we'll deal with it anyway.",
+ e);
+
+ if (tc != null)
+ {
+ try
+ {
+ tc.close();
+ }
+ catch (Throwable t)
+ {
+ }
+ }
+
+ if (connector != null)
+ {
+ try
+ {
+ connector.close();
+ }
+ catch (Throwable t)
+ {
+ }
+ }
+
+ tc = null;
+
+ connector = null;
+ }
+
+ if (tc == null)
+ {
+ return connection;
+ }
+
+ connection = new RemotingConnectionImpl(tc, callTimeout, interceptors);
+
+ connection.addFailureListener(new DelegatingFailureListener(connection.getID()));
+
+ connection.getChannel(0, -1, false).setHandler(new Channel0Handler(connection));
+
+ if (clientFailureCheckPeriod != -1)
+ {
+ if (pingerFuture == null)
+ {
+ pingRunnable = new PingRunnable();
+
+ pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new ActualScheduled(pingRunnable),
+ 0,
+ clientFailureCheckPeriod,
+ TimeUnit.MILLISECONDS);
+ }
+ // send a ping every time we create a new remoting connection
+ // to set up its TTL on the server side
+ else
+ {
+ pingRunnable.run();
+ }
+ }
+ }
+
+ return connection;
+ }
+
+ private ConnectorFactory instantiateConnectorFactory(final String connectorFactoryClassName)
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ Class<?> clazz = loader.loadClass(connectorFactoryClassName);
+ return (ConnectorFactory)clazz.newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Error instantiating connector factory \"" + connectorFactoryClassName +
+ "\"", e);
+ }
+ }
+
+ private void lockChannel1()
+ {
+ Channel channel1 = connection.getChannel(1, -1, false);
+
+ channel1.getLock().lock();
+ }
+
+ private void unlockChannel1()
+ {
+ Channel channel1 = connection.getChannel(1, -1, false);
+
+ channel1.getLock().unlock();
+ }
+
+ private void forceReturnChannel1()
+ {
+ Channel channel1 = connection.getChannel(1, -1, false);
+
+ channel1.returnBlocking();
+ }
+
+ private class Channel0Handler implements ChannelHandler
+ {
+ private final RemotingConnection conn;
+
+ private Channel0Handler(final RemotingConnection conn)
+ {
+ this.conn = conn;
+ }
+
+ public void handlePacket(final Packet packet)
+ {
+ final byte type = packet.getType();
+
+ if (type == PacketImpl.DISCONNECT)
+ {
+ threadPool.execute(new Runnable()
+ {
+ // Must be executed on new thread since cannot block the netty thread for a long time and fail can
+ // cause reconnect loop
+ public void run()
+ {
+ conn.fail(new HornetQException(HornetQException.DISCONNECTED,
+ "The connection was disconnected because of server shutdown"));
+ }
+ });
+ }
+ }
+ }
+
+ private class DelegatingBufferHandler extends AbstractBufferHandler
+ {
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ {
+ // ConnectionEntry entry = connections.get(connectionID);
+
+ if (connection != null && connectionID == connection.getID())
+ {
+ connection.bufferReceived(connectionID, buffer);
+ }
+ }
+ }
+
+ private class DelegatingFailureListener implements FailureListener
+ {
+ private final Object connectionID;
+
+ DelegatingFailureListener(final Object connectionID)
+ {
+ this.connectionID = connectionID;
+ }
+
+ public void connectionFailed(final HornetQException me)
+ {
+ handleConnectionFailure(connectionID, me);
+ }
+ }
+
+ private static final class ActualScheduled implements Runnable
+ {
+ private final WeakReference<PingRunnable> pingRunnable;
+
+ ActualScheduled(final PingRunnable runnable)
+ {
+ this.pingRunnable = new WeakReference<PingRunnable>(runnable);
+ }
+
+ public void run()
+ {
+ PingRunnable runnable = pingRunnable.get();
+
+ if (runnable != null)
+ {
+ runnable.run();
+ }
+ }
+
+ }
+
+ private final class PingRunnable implements Runnable
+ {
+ private boolean cancelled;
+
+ private boolean first;
+
+ private long lastCheck = System.currentTimeMillis();
+
+ public synchronized void run()
+ {
+ if (cancelled || (stopPingingAfterOne && !first))
+ {
+ return;
+ }
+
+ first = false;
+
+ long now = System.currentTimeMillis();
+
+ if (clientFailureCheckPeriod != -1 && now >= lastCheck + clientFailureCheckPeriod)
+ {
+ if (!connection.checkDataReceived())
+ {
+ final HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Did not receive data from server for " + connection.getTransportConnection());
+
+ threadPool.execute(new Runnable()
+ {
+ // Must be executed on different thread
+ public void run()
+ {
+ connection.fail(me);
+ }
+ });
+
+ return;
+ }
+ else
+ {
+ lastCheck = now;
+ }
+ }
+
+ // Send a ping
+
+ Ping ping = new Ping(connectionTTL);
+
+ Channel channel0 = connection.getChannel(0, -1, false);
+
+ channel0.send(ping);
+ }
+
+ public synchronized void cancel()
+ {
+ cancelled = true;
+ }
+ }
+
+}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -381,11 +381,11 @@
public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID)
{
clearUpTo(otherLastReceivedCommandID);
-
+
for (final Packet packet : resendCache)
{
packet.setChannelID(newChannelID);
-
+
doWrite(packet);
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -20,6 +20,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
@@ -29,9 +30,11 @@
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.remoting.spi.Connection;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.SimpleIDGenerator;
+import org.hornetq.utils.SimpleString;
/**
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
@@ -330,11 +333,11 @@
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
-
+
if (executor == null || packet.getType() == PacketImpl.PING)
{
// Pings must always be handled out of band so we can send pings back to the client quickly
- // otherwise they would get in the queue with everything else which might give a intolerable delay
+ // otherwise they would get in the queue with everything else which might give an intolerable delay
doBufferReceived(packet);
}
else
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -109,8 +109,6 @@
SimpleString getNodeID();
- // void initialiseBackup(UUID nodeID, long currentMessageID) throws Exception;
-
boolean isInitialised();
Queue createQueue(SimpleString address,
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -537,6 +537,7 @@
{
if (flowRecord != null)
{
+
if (notifConsumer != null)
{
try
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -32,7 +32,7 @@
import javax.management.MBeanServer;
-import org.hornetq.core.client.impl.ConnectionManager;
+import org.hornetq.core.client.impl.FailoverManager;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.config.cluster.QueueConfiguration;
@@ -189,7 +189,7 @@
private static AtomicInteger managementConnectorSequence = new AtomicInteger(0);
- private ConnectionManager replicatingConnectionManager;
+ private FailoverManager replicatingFailoverManager;
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -46,6 +46,7 @@
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
/**
@@ -485,7 +486,7 @@
}
final ServerMessage message = ref.getMessage();
-
+
if (filter != null && !filter.match(message))
{
return HandleStatus.NO_MATCH;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -1572,7 +1572,7 @@
public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
{
boolean wasStarted = this.started;
-
+
if (wasStarted)
{
this.setStarted(false);
@@ -1589,7 +1589,7 @@
// received responses that the backup did not know about.
channel.transferConnection(newConnection);
-
+
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = newConnection;
@@ -1605,7 +1605,7 @@
{
this.setStarted(true);
}
-
+
return serverLastReceivedCommandID;
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -413,16 +413,6 @@
sessionFactory.setAutoGroup(autoGroup);
}
- public synchronized int getMaxConnections()
- {
- return sessionFactory.getMaxConnections();
- }
-
- public synchronized void setMaxConnections(int maxConnections)
- {
- sessionFactory.setMaxConnections(maxConnections);
- }
-
public synchronized boolean isPreAcknowledge()
{
return sessionFactory.isPreAcknowledge();
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -141,8 +141,7 @@
String clientID,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
int consumerWindowSize,
@@ -174,8 +173,7 @@
long discoveryRefreshTimeout,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
int consumerWindowSize,
Modified: trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -63,10 +63,6 @@
void setCallTimeout(long callTimeout);
- int getMaxConnections();
-
- void setMaxConnections(int maxConnections);
-
boolean isCacheLargeMessagesClient();
void setCacheLargeMessagesClient(boolean cacheLargeMessagesClient);
Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -30,7 +30,6 @@
*/
public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConfiguration
{
-
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -55,8 +54,6 @@
public long callTimeout = ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
- public int maxConnections = ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
-
public boolean cacheLargeMessagesClient = ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
public int minLargeMessageSize = ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
@@ -232,16 +229,6 @@
this.callTimeout = callTimeout;
}
- public int getMaxConnections()
- {
- return maxConnections;
- }
-
- public void setMaxConnections(int maxConnections)
- {
- this.maxConnections = maxConnections;
- }
-
public boolean isCacheLargeMessagesClient()
{
return cacheLargeMessagesClient;
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -139,8 +139,7 @@
boolean blockOnAcknowledge = getBoolean(e, "block-on-acknowledge", ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
boolean blockOnNonPersistentSend = getBoolean(e, "block-on-non-persistent-send", ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND);
boolean blockOnPersistentSend = getBoolean(e, "block-on-persistent-send", ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND);
- boolean autoGroup = getBoolean(e, "auto-group", ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP);
- int maxConnections = getInteger(e, "max-connections", ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS, GT_ZERO);
+ boolean autoGroup = getBoolean(e, "auto-group", ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP);
boolean preAcknowledge = getBoolean(e, "pre-acknowledge", ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE);
long retryInterval = getLong(e, "retry-interval", ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL, GT_ZERO);
double retryIntervalMultiplier = getDouble(e, "retry-interval-multiplier", ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER, GT_ZERO);
@@ -236,8 +235,7 @@
discoveryGroupConfiguration.getRefreshTimeout(),
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
- maxConnections,
+ callTimeout,
cacheLargeMessagesClient,
minLargeMessageSize,
consumerWindowSize,
@@ -270,8 +268,7 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
- maxConnections,
+ callTimeout,
cacheLargeMessagesClient,
minLargeMessageSize,
consumerWindowSize,
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -365,8 +365,7 @@
String clientID,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
int consumerWindowSize,
@@ -399,8 +398,7 @@
cf.setClientID(clientID);
cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
cf.setConnectionTTL(connectionTTL);
- cf.setCallTimeout(callTimeout);
- cf.setMaxConnections(maxConnections);
+ cf.setCallTimeout(callTimeout);
cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cf.setMinLargeMessageSize(minLargeMessageSize);
cf.setConsumerWindowSize(consumerWindowSize);
@@ -435,8 +433,7 @@
long discoveryRefreshTimeout,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
int consumerWindowSize,
@@ -471,8 +468,7 @@
cf.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
cf.setConnectionTTL(connectionTTL);
- cf.setCallTimeout(callTimeout);
- cf.setMaxConnections(maxConnections);
+ cf.setCallTimeout(callTimeout);
cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cf.setMinLargeMessageSize(minLargeMessageSize);
cf.setConsumerWindowSize(consumerWindowSize);
@@ -752,8 +748,7 @@
config.getDiscoveryRefreshTimeout(),
config.getClientFailureCheckPeriod(),
config.getConnectionTTL(),
- config.getCallTimeout(),
- config.getMaxConnections(),
+ config.getCallTimeout(),
config.isCacheLargeMessagesClient(),
config.getMinLargeMessageSize(),
config.getConsumerWindowSize(),
@@ -786,8 +781,7 @@
config.getClientID(),
config.getClientFailureCheckPeriod(),
config.getConnectionTTL(),
- config.getCallTimeout(),
- config.getMaxConnections(),
+ config.getCallTimeout(),
config.isCacheLargeMessagesClient(),
config.getMinLargeMessageSize(),
config.getConsumerWindowSize(),
Modified: trunk/src/main/org/hornetq/jms/server/management/ConnectionFactoryControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/ConnectionFactoryControl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/jms/server/management/ConnectionFactoryControl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -60,8 +60,6 @@
boolean isAutoGroup();
- int getMaxConnections();
-
long getRetryInterval();
double getRetryIntervalMultiplier();
Modified: trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -98,8 +98,7 @@
String clientID,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
@@ -132,8 +131,7 @@
@Parameter(name = "clientID") String clientID,
@Parameter(name = "clientFailureCheckPeriod") long clientFailureCheckPeriod,
@Parameter(name = "connectionTTL") long connectionTTL,
- @Parameter(name = "callTimeout") long callTimeout,
- @Parameter(name = "maxConnections") int maxConnections,
+ @Parameter(name = "callTimeout") long callTimeout,
@Parameter(name = "cacheLargemessageClient") boolean cacheLargeMessageClient,
@Parameter(name = "minLargeMessageSize") int minLargeMessageSize,
@Parameter(name = "consumerWindowSize") int consumerWindowSize,
@@ -178,8 +176,7 @@
long discoveryRefreshTimeout,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
@@ -213,8 +210,7 @@
@Parameter(name = "discoveryRefreshTimeout") long discoveryRefreshTimeout,
@Parameter(name = "clientFailureCheckPeriod") long clientFailureCheckPeriod,
@Parameter(name = "connectionTTL") long connectionTTL,
- @Parameter(name = "callTimeout") long callTimeout,
- @Parameter(name = "maxConnections") int maxConnections,
+ @Parameter(name = "callTimeout") long callTimeout,
@Parameter(name = "cacheLargemessageClient") boolean cacheLargeMessageClient,
@Parameter(name = "minLargeMessageSize") int minLargeMessageSize,
@Parameter(name = "consumerWindowSize") int consumerWindowSize,
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -130,11 +130,6 @@
return cf.getConnectionTTL();
}
- public int getMaxConnections()
- {
- return cf.getMaxConnections();
- }
-
public int getReconnectAttempts()
{
return cf.getReconnectAttempts();
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -225,8 +225,7 @@
final String clientID,
final long clientFailureCheckPeriod,
final long connectionTTL,
- final long callTimeout,
- final int maxConnections,
+ final long callTimeout,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
@@ -263,8 +262,7 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
- maxConnections,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
@@ -300,8 +298,7 @@
final String clientID,
final long clientFailureCheckPeriod,
final long connectionTTL,
- final long callTimeout,
- final int maxConnections,
+ final long callTimeout,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
@@ -340,8 +337,7 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
- maxConnections,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
@@ -398,8 +394,7 @@
final long discoveryRefreshTimeout,
final long clientFailureCheckPeriod,
final long connectionTTL,
- final long callTimeout,
- final int maxConnections,
+ final long callTimeout,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
@@ -434,8 +429,7 @@
discoveryRefreshTimeout,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
- maxConnections,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
@@ -471,8 +465,7 @@
final long discoveryRefreshTimeout,
final long clientFailureCheckPeriod,
final long connectionTTL,
- final long callTimeout,
- final int maxConnections,
+ final long callTimeout,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
@@ -507,8 +500,7 @@
discoveryRefreshTimeout,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
- maxConnections,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
Modified: trunk/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
===================================================================
--- trunk/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -89,8 +89,6 @@
private Boolean autoGroup;
- private Integer maxConnections;
-
private Boolean preAcknowledge;
private Long retryInterval;
@@ -555,26 +553,7 @@
this.autoGroup = autoGroup;
}
- public Integer getMaxConnections()
- {
- if (trace)
- {
- log.trace("getMaxConnections()");
- }
- hasBeenUpdated = true;
- return maxConnections;
- }
-
- public void setMaxConnections(Integer maxConnections)
- {
- if (trace)
- {
- log.trace("setMaxConnections(" + maxConnections + ")");
- }
- hasBeenUpdated = true;
- this.maxConnections = maxConnections;
- }
-
+
public Boolean isPreAcknowledge()
{
if (trace)
Modified: trunk/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -604,17 +604,7 @@
{
mcfProperties.setAutoGroup(autoGroup);
}
-
- public Integer getMaxConnections()
- {
- return mcfProperties.getMaxConnections();
- }
-
- public void setMaxConnections(Integer maxConnections)
- {
- mcfProperties.setMaxConnections(maxConnections);
- }
-
+
public Boolean isPreAcknowledge()
{
return mcfProperties.isPreAcknowledge();
Modified: trunk/src/main/org/hornetq/ra/HornetQRAMetaData.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAMetaData.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/ra/HornetQRAMetaData.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -81,21 +81,6 @@
}
/**
- * Get the maximum number of connections -- RETURNS 0
- * @return The number
- * @exception ResourceException Thrown if operation fails
- */
- public int getMaxConnections() throws ResourceException
- {
- if (trace)
- {
- log.trace("getMaxConnections()");
- }
-
- return 0;
- }
-
- /**
* Get the user name
* @return The user name
* @exception ResourceException Thrown if operation fails
@@ -109,4 +94,13 @@
return mc.getUserName();
}
+
+ /* (non-Javadoc)
+ * @see javax.resource.spi.ManagedConnectionMetaData#getMaxConnections()
+ */
+ public int getMaxConnections() throws ResourceException
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
}
Modified: trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -836,38 +836,8 @@
raProperties.setAutoGroup(autoGroup);
}
-
+
/**
- * Get max connections
- *
- * @return The value
- */
- public Integer getMaxConnections()
- {
- if (trace)
- {
- log.trace("getMaxConnections()");
- }
-
- return raProperties.getMaxConnections();
- }
-
- /**
- * Set max connections
- *
- * @param maxConnections The value
- */
- public void setMaxConnections(final Integer maxConnections)
- {
- if (trace)
- {
- log.trace("setMaxConnections(" + maxConnections + ")");
- }
-
- raProperties.setMaxConnections(maxConnections);
- }
-
- /**
* Get pre acknowledge
*
* @return The value
@@ -1494,12 +1464,7 @@
{
cf.setDupsOKBatchSize(val2);
}
- val2 = overrideProperties.getMaxConnections() != null ? overrideProperties.getMaxConnections()
- : raProperties.getMaxConnections();
- if (val2 != null)
- {
- cf.setMaxConnections(val2);
- }
+
val2 = overrideProperties.getMinLargeMessageSize() != null ? overrideProperties.getMinLargeMessageSize()
: raProperties.getMinLargeMessageSize();
if (val2 != null)
Modified: trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2009-10-02 15:47:08 UTC (rev 8036)
@@ -29,7 +29,6 @@
<discovery-initial-wait-timeout>678</discovery-initial-wait-timeout>
<failover-on-server-shutdown>false</failover-on-server-shutdown>
<connection-load-balancing-policy-class-name>FooClass</connection-load-balancing-policy-class-name>
- <max-connections>12</max-connections>
<reconnect-attempts>34</reconnect-attempts>
<retry-interval>5</retry-interval>
<retry-interval-multiplier>6.0</retry-interval-multiplier>
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -23,7 +23,6 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
@@ -44,7 +43,6 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.utils.Pair;
@@ -93,8 +91,7 @@
null,
DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
DEFAULT_CONNECTION_TTL,
- DEFAULT_CALL_TIMEOUT,
- DEFAULT_MAX_CONNECTIONS,
+ DEFAULT_CALL_TIMEOUT,
DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
DEFAULT_CONSUMER_WINDOW_SIZE,
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -23,7 +23,6 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
@@ -41,7 +40,6 @@
import javax.naming.InitialContext;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.utils.Pair;
@@ -89,8 +87,7 @@
null,
DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
DEFAULT_CONNECTION_TTL,
- DEFAULT_CALL_TIMEOUT,
- DEFAULT_MAX_CONNECTIONS,
+ DEFAULT_CALL_TIMEOUT,
DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
DEFAULT_CONSUMER_WINDOW_SIZE,
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -22,7 +22,6 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
@@ -48,7 +47,6 @@
import javax.management.ObjectName;
import javax.naming.InitialContext;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ObjectNames;
@@ -318,8 +316,7 @@
clientId,
DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
DEFAULT_CONNECTION_TTL,
- DEFAULT_CALL_TIMEOUT,
- DEFAULT_MAX_CONNECTIONS,
+ DEFAULT_CALL_TIMEOUT,
DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
prefetchSize,
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -118,8 +118,7 @@
ClientSessionFactoryImpl.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -179,8 +178,7 @@
ClientSessionFactoryImpl.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -230,8 +228,7 @@
ClientSessionFactoryImpl.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -281,8 +278,7 @@
ClientSessionFactoryImpl.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -332,8 +328,7 @@
ClientSessionFactoryImpl.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -380,7 +375,6 @@
long clientFailureCheckPeriod = RandomUtil.randomPositiveLong();
long connectionTTL = RandomUtil.randomPositiveLong();
long callTimeout = RandomUtil.randomPositiveLong();
- int maxConnections = RandomUtil.randomPositiveInt();
int minLargeMessageSize = RandomUtil.randomPositiveInt();
int consumerWindowSize = RandomUtil.randomPositiveInt();
int consumerMaxRate = RandomUtil.randomPositiveInt();
@@ -409,7 +403,6 @@
cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
cf.setConnectionTTL(connectionTTL);
cf.setCallTimeout(callTimeout);
- cf.setMaxConnections(maxConnections);
cf.setMinLargeMessageSize(minLargeMessageSize);
cf.setConsumerWindowSize(consumerWindowSize);
cf.setConsumerMaxRate(consumerMaxRate);
@@ -437,8 +430,7 @@
assertEquals(discoveryRefreshTimeout, cf.getDiscoveryRefreshTimeout());
assertEquals(clientFailureCheckPeriod, cf.getClientFailureCheckPeriod());
assertEquals(connectionTTL, cf.getConnectionTTL());
- assertEquals(callTimeout, cf.getCallTimeout());
- assertEquals(maxConnections, cf.getMaxConnections());
+ assertEquals(callTimeout, cf.getCallTimeout());
assertEquals(minLargeMessageSize, cf.getMinLargeMessageSize());
assertEquals(consumerWindowSize, cf.getConsumerWindowSize());
assertEquals(consumerMaxRate, cf.getConsumerMaxRate());
@@ -474,8 +466,7 @@
long discoveryRefreshTimeout = RandomUtil.randomPositiveLong();
long clientFailureCheckPeriod = RandomUtil.randomPositiveLong();
long connectionTTL = RandomUtil.randomPositiveLong();
- long callTimeout = RandomUtil.randomPositiveLong();
- int maxConnections = RandomUtil.randomPositiveInt();
+ long callTimeout = RandomUtil.randomPositiveLong();
int minLargeMessageSize = RandomUtil.randomPositiveInt();
int consumerWindowSize = RandomUtil.randomPositiveInt();
int consumerMaxRate = RandomUtil.randomPositiveInt();
@@ -562,15 +553,6 @@
}
try
{
- cf.setMaxConnections(maxConnections);
- fail("Should throw exception");
- }
- catch (IllegalStateException e)
- {
- // OK
- }
- try
- {
cf.setMinLargeMessageSize(minLargeMessageSize);
fail("Should throw exception");
}
@@ -757,7 +739,6 @@
cf.getClientFailureCheckPeriod();
cf.getConnectionTTL();
cf.getCallTimeout();
- cf.getMaxConnections();
cf.getMinLargeMessageSize();
cf.getConsumerWindowSize();
cf.getConsumerMaxRate();
@@ -788,8 +769,7 @@
long discoveryRefreshTimeout,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
@@ -831,7 +811,6 @@
assertEquals(cf.getClientFailureCheckPeriod(), clientFailureCheckPeriod);
assertEquals(cf.getConnectionTTL(), connectionTTL);
assertEquals(cf.getCallTimeout(), callTimeout);
- assertEquals(cf.getMaxConnections(), maxConnections);
assertEquals(cf.getMinLargeMessageSize(), minLargeMessageSize);
assertEquals(cf.getConsumerWindowSize(), consumerWindowSize);
assertEquals(cf.getConsumerMaxRate(), consumerMaxRate);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -29,7 +29,7 @@
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ConnectionManagerImpl;
+import org.hornetq.core.client.impl.FailoverManagerImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
@@ -135,11 +135,6 @@
private ClientSessionFactory[] sfs = new ClientSessionFactory[MAX_SERVERS];
- protected void failNode(TransportConfiguration conf)
- {
- ConnectionManagerImpl.failAllConnectionsForConnector(conf);
- }
-
protected void waitForMessages(int node, final String address, final int count) throws Exception
{
HornetQServer server = this.servers[node];
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -126,6 +126,8 @@
boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(ok);
+
+ log.info("got here 1");
ClientConsumer consumer = session.createConsumer(ADDRESS);
@@ -149,6 +151,7 @@
}
}
+ log.info("closing session");
session.close();
assertEquals(0, sf.numSessions());
@@ -1291,117 +1294,7 @@
assertEquals(0, sf.numConnections());
}
- public void testFailoverFailMultipleUnderlyingConnections() throws Exception
- {
- ClientSessionFactoryInternal sf = getSessionFactory();
-
- sf.setBlockOnNonPersistentSend(true);
- sf.setBlockOnPersistentSend(true);
-
- class MyListener implements FailureListener
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- public void connectionFailed(HornetQException me)
- {
- latch.countDown();
- }
- }
-
- ClientSession session1 = sf.createSession(true, true);
- ClientSession session2 = sf.createSession(true, true);
- ClientSession session3 = sf.createSession(true, true);
-
- SimpleString queueName1 = new SimpleString("queue1");
- session1.createQueue(ADDRESS, queueName1, null, true);
- MyListener listener1 = new MyListener();
- session1.addFailureListener(listener1);
-
- SimpleString queueName2 = new SimpleString("queue2");
- session2.createQueue(ADDRESS, queueName2, null, true);
- MyListener listener2 = new MyListener();
- session2.addFailureListener(listener2);
-
- SimpleString queueName3 = new SimpleString("queue3");
- session3.createQueue(ADDRESS, queueName3, null, true);
- MyListener listener3 = new MyListener();
- session3.addFailureListener(listener3);
-
- ClientConsumer consumer1 = session1.createConsumer(queueName1);
- ClientConsumer consumer2 = session1.createConsumer(queueName2);
- ClientConsumer consumer3 = session1.createConsumer(queueName3);
-
- ClientProducer producer = session1.createProducer(ADDRESS);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session1.createClientMessage(true);
-
- message.getBody().writeString("message" + i);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
- // Fail all the connections
-
- RemotingConnection conn1 = ((ClientSessionInternal)session1).getConnection();
- RemotingConnection conn2 = ((ClientSessionInternal)session2).getConnection();
- RemotingConnection conn3 = ((ClientSessionInternal)session3).getConnection();
-
- assertTrue(conn1 != conn2);
- assertTrue(conn2 != conn3);
- assertTrue(conn1 != conn3);
-
- conn2.fail(new HornetQException(HornetQException.NOT_CONNECTED));
- conn3.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
- // Wait to be informed of failure
-
- boolean ok = listener1.latch.await(1000, TimeUnit.MILLISECONDS);
- assertTrue(ok);
- ok = listener2.latch.await(1000, TimeUnit.MILLISECONDS);
- assertTrue(ok);
- ok = listener3.latch.await(1000, TimeUnit.MILLISECONDS);
- assertTrue(ok);
-
- session1.start();
- session2.start();
- session3.start();
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(1000);
- assertNotNull(message);
- assertEquals("message" + i, message.getBody().readString());
- assertEquals(i, message.getProperty("counter"));
- message.acknowledge();
-
- message = consumer2.receive(1000);
- assertNotNull(message);
- assertEquals("message" + i, message.getBody().readString());
- assertEquals(i, message.getProperty("counter"));
- message.acknowledge();
-
- message = consumer3.receive(1000);
- assertNotNull(message);
- assertEquals("message" + i, message.getBody().readString());
- assertEquals(i, message.getProperty("counter"));
- message.acknowledge();
- }
-
- session1.close();
- session2.close();
- session3.close();
-
- assertEquals(0, sf.numSessions());
-
- assertEquals(0, sf.numConnections());
- }
-
+
/*
* Browser will get reset to beginning after failover
*/
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -85,13 +85,13 @@
session.createQueue(ADDRESS, ADDRESS, null, false);
- final int numIterations = 100;
+ final int numIterations = 10;
for (int j = 0; j < numIterations; j++)
{
ClientProducer producer = session.createProducer(ADDRESS);
- final int numMessages = 1000;
+ final int numMessages = 100;
for (int i = 0; i < numMessages; i++)
{
@@ -290,19 +290,10 @@
InVMConnector.numberOfFailures = 10;
InVMConnector.failOnCreateConnection = true;
- // We need to fail on different connections.
-
- // We fail on one connection then the connection manager tries to reconnect all connections
- // Then we fail the other, and the connection manager is then called while the reconnection is occurring
- // We can't use the same connection since RemotingConnectionImpl only allows one fail to be in process
- // at same time
-
final RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
final RemotingConnection conn2 = ((ClientSessionInternal)session2).getConnection();
- assertTrue(conn != conn2);
-
Thread t = new Thread()
{
public void run()
@@ -315,8 +306,6 @@
{
}
- log.info("calling fail async");
-
conn2.fail(new HornetQException(HornetQException.NOT_CONNECTED, "Did not receive pong from server"));
}
};
@@ -416,8 +405,6 @@
t.start();
- log.info("Failing connection");
-
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
session.start();
@@ -637,7 +624,7 @@
RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
long start = System.currentTimeMillis();
-
+
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
session.start();
@@ -662,8 +649,6 @@
long end = System.currentTimeMillis();
double wait = retryInterval + retryMultiplier * retryInterval + retryMultiplier * retryMultiplier * retryInterval;
-
- log.info("wait is " + wait);
assertTrue((end - start) >= wait);
@@ -679,7 +664,7 @@
final double retryMultiplier = 2d;
final int reconnectAttempts = -1;
-
+
final long maxRetryInterval = 1000;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
@@ -718,7 +703,7 @@
RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
long start = System.currentTimeMillis();
-
+
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
session.start();
@@ -743,11 +728,9 @@
long end = System.currentTimeMillis();
double wait = retryInterval + retryMultiplier * 2 * retryInterval + retryMultiplier;
-
- log.info("wait is " + wait);
assertTrue((end - start) >= wait);
-
+
assertTrue((end - start) < wait + 500);
session.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -19,7 +19,6 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
@@ -150,8 +149,7 @@
null,
1000,
DEFAULT_CONNECTION_TTL,
- callTimeout,
- DEFAULT_MAX_CONNECTIONS,
+ callTimeout,
DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
DEFAULT_CONSUMER_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -73,8 +73,7 @@
null,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -144,8 +143,7 @@
null,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -192,8 +190,7 @@
null,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -241,8 +238,7 @@
null,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -290,8 +286,7 @@
null,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -336,8 +331,7 @@
long discoveryRefreshTimeout = RandomUtil.randomPositiveLong();
long clientFailureCheckPeriod = RandomUtil.randomPositiveLong();
long connectionTTL = RandomUtil.randomPositiveLong();
- long callTimeout = RandomUtil.randomPositiveLong();
- int maxConnections = RandomUtil.randomPositiveInt();
+ long callTimeout = RandomUtil.randomPositiveLong();
int minLargeMessageSize = RandomUtil.randomPositiveInt();
int consumerWindowSize = RandomUtil.randomPositiveInt();
int consumerMaxRate = RandomUtil.randomPositiveInt();
@@ -365,8 +359,7 @@
cf.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
cf.setConnectionTTL(connectionTTL);
- cf.setCallTimeout(callTimeout);
- cf.setMaxConnections(maxConnections);
+ cf.setCallTimeout(callTimeout);
cf.setMinLargeMessageSize(minLargeMessageSize);
cf.setConsumerWindowSize(consumerWindowSize);
cf.setConsumerMaxRate(consumerMaxRate);
@@ -394,8 +387,7 @@
assertEquals(discoveryRefreshTimeout, cf.getDiscoveryRefreshTimeout());
assertEquals(clientFailureCheckPeriod, cf.getClientFailureCheckPeriod());
assertEquals(connectionTTL, cf.getConnectionTTL());
- assertEquals(callTimeout, cf.getCallTimeout());
- assertEquals(maxConnections, cf.getMaxConnections());
+ assertEquals(callTimeout, cf.getCallTimeout());
assertEquals(minLargeMessageSize, cf.getMinLargeMessageSize());
assertEquals(consumerWindowSize, cf.getConsumerWindowSize());
assertEquals(consumerMaxRate, cf.getConsumerMaxRate());
@@ -432,8 +424,7 @@
String clientID = RandomUtil.randomString();
long clientFailureCheckPeriod = RandomUtil.randomPositiveLong();
long connectionTTL = RandomUtil.randomPositiveLong();
- long callTimeout = RandomUtil.randomPositiveLong();
- int maxConnections = RandomUtil.randomPositiveInt();
+ long callTimeout = RandomUtil.randomPositiveLong();
int minLargeMessageSize = RandomUtil.randomPositiveInt();
int consumerWindowSize = RandomUtil.randomPositiveInt();
int consumerMaxRate = RandomUtil.randomPositiveInt();
@@ -530,15 +521,6 @@
}
try
{
- cf.setMaxConnections(maxConnections);
- fail("Should throw exception");
- }
- catch (IllegalStateException e)
- {
- // OK
- }
- try
- {
cf.setMinLargeMessageSize(minLargeMessageSize);
fail("Should throw exception");
}
@@ -734,8 +716,7 @@
cf.getClientID();
cf.getClientFailureCheckPeriod();
cf.getConnectionTTL();
- cf.getCallTimeout();
- cf.getMaxConnections();
+ cf.getCallTimeout();
cf.getMinLargeMessageSize();
cf.getConsumerWindowSize();
cf.getConsumerMaxRate();
@@ -768,8 +749,7 @@
String clientID,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
@@ -812,8 +792,7 @@
assertEquals(cf.getClientID(), clientID);
assertEquals(cf.getClientFailureCheckPeriod(), clientFailureCheckPeriod);
assertEquals(cf.getConnectionTTL(), connectionTTL);
- assertEquals(cf.getCallTimeout(), callTimeout);
- assertEquals(cf.getMaxConnections(), maxConnections);
+ assertEquals(cf.getCallTimeout(), callTimeout);
assertEquals(cf.getMinLargeMessageSize(), minLargeMessageSize);
assertEquals(cf.getConsumerWindowSize(), consumerWindowSize);
assertEquals(cf.getConsumerMaxRate(), consumerMaxRate);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -24,7 +24,6 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -260,8 +259,7 @@
null,
DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
DEFAULT_CONNECTION_TTL,
- callTimeout,
- DEFAULT_MAX_CONNECTIONS,
+ callTimeout,
DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
DEFAULT_CONSUMER_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -24,7 +24,6 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -42,7 +41,6 @@
import javax.jms.Session;
import javax.jms.TextMessage;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.tests.util.JMSTestBase;
import org.hornetq.utils.Pair;
@@ -225,8 +223,7 @@
null,
DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
DEFAULT_CONNECTION_TTL,
- callTimeout,
- DEFAULT_MAX_CONNECTIONS,
+ callTimeout,
DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
DEFAULT_CONSUMER_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -23,7 +23,6 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
@@ -264,8 +263,7 @@
null,
DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
DEFAULT_CONNECTION_TTL,
- callTimeout,
- DEFAULT_MAX_CONNECTIONS,
+ callTimeout,
true,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
DEFAULT_CONSUMER_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -103,8 +103,6 @@
Session sess6 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess7 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- assertEquals(ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS, server.getRemotingService().getConnections().size());
-
sess1 = sess2 = sess3 = sess4 = sess5 = sess6 = sess7 = null;
conn1 = null;
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -15,7 +15,6 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
@@ -24,7 +23,6 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
@@ -37,7 +35,11 @@
import java.util.ArrayList;
import java.util.List;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
@@ -161,8 +163,7 @@
null,
DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
DEFAULT_CONNECTION_TTL,
- callTimeout,
- DEFAULT_MAX_CONNECTIONS,
+ callTimeout,
DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
DEFAULT_CONSUMER_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -206,8 +206,7 @@
assertEquals(false, cf.isAutoGroup());
assertEquals(true, cf.isPreAcknowledge());
assertEquals(2345, cf.getConnectionTTL());
- assertEquals(false, cf.isFailoverOnServerShutdown());
- assertEquals(12, cf.getMaxConnections());
+ assertEquals(false, cf.isFailoverOnServerShutdown());
assertEquals(34, cf.getReconnectAttempts());
assertEquals(5, cf.getRetryInterval());
assertEquals(6.0, cf.getRetryIntervalMultiplier());
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -431,8 +431,7 @@
clientID,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
@@ -477,8 +476,7 @@
clientID,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -99,8 +99,7 @@
long discoveryRefreshTimeout,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
@@ -133,8 +132,7 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
- maxConnections,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
@@ -168,8 +166,7 @@
long discoveryRefreshTimeout,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
@@ -202,8 +199,7 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
- maxConnections,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
@@ -386,8 +382,7 @@
String clientID,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
@@ -421,8 +416,7 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
- maxConnections,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
@@ -457,8 +451,7 @@
String clientID,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
- int maxConnections,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
@@ -492,8 +485,7 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
- maxConnections,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -19,7 +19,7 @@
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ConnectionManagerImpl;
+import org.hornetq.core.client.impl.FailoverManagerImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.exception.HornetQException;
@@ -223,7 +223,7 @@
// We need to get it to stop pinging after one
- ((ConnectionManagerImpl)csf.getConnectionManagers()[0]).stopPingingAfterOne();
+ ((FailoverManagerImpl)csf.getFailoverManagers()[0]).stopPingingAfterOne();
RemotingConnection serverConn = null;
Modified: trunk/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -80,8 +80,7 @@
assertEquals(factory.getDiscoveryInitialWaitTimeout(), ClientSessionFactoryImpl.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
assertEquals(factory.getDiscoveryPort(), 0);
assertEquals(factory.getDiscoveryRefreshTimeout(), ClientSessionFactoryImpl.DEFAULT_DISCOVERY_REFRESH_TIMEOUT);
- assertEquals(factory.getDupsOKBatchSize(), ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE);
- assertEquals(factory.getMaxConnections(), ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+ assertEquals(factory.getDupsOKBatchSize(), ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE);
assertEquals(factory.getMinLargeMessageSize(), ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
assertEquals(factory.getProducerMaxRate(), ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE);
assertEquals(factory.getProducerWindowSize(), ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE);
@@ -125,8 +124,7 @@
assertEquals(factory.getDiscoveryInitialWaitTimeout(), ClientSessionFactoryImpl.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
assertEquals(factory.getDiscoveryPort(), 0);
assertEquals(factory.getDiscoveryRefreshTimeout(), ClientSessionFactoryImpl.DEFAULT_DISCOVERY_REFRESH_TIMEOUT);
- assertEquals(factory.getDupsOKBatchSize(), ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE);
- assertEquals(factory.getMaxConnections(), ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+ assertEquals(factory.getDupsOKBatchSize(), ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE);
assertEquals(factory.getMinLargeMessageSize(), ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
assertEquals(factory.getProducerMaxRate(), ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE);
assertEquals(factory.getProducerWindowSize(), ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE);
@@ -163,8 +161,7 @@
ra.setDiscoveryInitialWaitTimeout(6l);
ra.setDiscoveryRefreshTimeout(7l);
ra.setDupsOKBatchSize(8);
- ra.setFailoverOnServerShutdown(!ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
- ra.setMaxConnections(9);
+ ra.setFailoverOnServerShutdown(!ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
ra.setMinLargeMessageSize(10);
ra.setPreAcknowledge(!ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE);
ra.setProducerMaxRate(11);
@@ -188,8 +185,7 @@
assertEquals(factory.getDiscoveryInitialWaitTimeout(), 6);
assertEquals(factory.getDiscoveryPort(), 0);
assertEquals(factory.getDiscoveryRefreshTimeout(), 7);
- assertEquals(factory.getDupsOKBatchSize(), 8);
- assertEquals(factory.getMaxConnections(), 9);
+ assertEquals(factory.getDupsOKBatchSize(), 8);
assertEquals(factory.getMinLargeMessageSize(), 10);
assertEquals(factory.getProducerMaxRate(), 11);
assertEquals(factory.getProducerWindowSize(), 12);
@@ -227,8 +223,7 @@
connectionFactoryProperties.setDiscoveryInitialWaitTimeout(6l);
connectionFactoryProperties.setDiscoveryRefreshTimeout(7l);
connectionFactoryProperties.setDupsOKBatchSize(8);
- connectionFactoryProperties.setFailoverOnServerShutdown(!ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
- connectionFactoryProperties.setMaxConnections(9);
+ connectionFactoryProperties.setFailoverOnServerShutdown(!ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
connectionFactoryProperties.setMinLargeMessageSize(10);
connectionFactoryProperties.setPreAcknowledge(!ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE);
connectionFactoryProperties.setProducerMaxRate(11);
@@ -252,8 +247,7 @@
assertEquals(factory.getDiscoveryInitialWaitTimeout(), 6);
assertEquals(factory.getDiscoveryPort(), 0);
assertEquals(factory.getDiscoveryRefreshTimeout(), 7);
- assertEquals(factory.getDupsOKBatchSize(), 8);
- assertEquals(factory.getMaxConnections(), 9);
+ assertEquals(factory.getDupsOKBatchSize(), 8);
assertEquals(factory.getMinLargeMessageSize(), 10);
assertEquals(factory.getProducerMaxRate(), 11);
assertEquals(factory.getProducerWindowSize(), 12);
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-10-02 13:56:28 UTC (rev 8035)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-10-02 15:47:08 UTC (rev 8036)
@@ -24,7 +24,6 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
@@ -41,7 +40,6 @@
import javax.jms.Queue;
import javax.naming.NamingException;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.server.HornetQ;
@@ -204,8 +202,7 @@
null,
DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
DEFAULT_CONNECTION_TTL,
- callTimeout,
- DEFAULT_MAX_CONNECTIONS,
+ callTimeout,
DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
DEFAULT_CONSUMER_WINDOW_SIZE,
More information about the hornetq-commits
mailing list