[jboss-cvs] JBoss Messaging SVN: r5457 - in trunk: src/main/org/jboss/messaging/core/client/impl and 20 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Dec 3 12:13:45 EST 2008
Author: timfox
Date: 2008-12-03 12:13:44 -0500 (Wed, 03 Dec 2008)
New Revision: 5457
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReconnectWithBackupTest.java
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionFailoverCompleteMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReconnectTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
Log:
Mainly more reconnect stuff - and if you're reading this in the IRC channel - back to work you lazy slackers!!
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/config/jbm-configuration.xml 2008-12-03 17:13:44 UTC (rev 5457)
@@ -60,7 +60,7 @@
<message-counter-enabled>false</message-counter-enabled>
<connection-scan-period>10000</connection-scan-period>
-
+
<!--how long before timing a transaction out-->
<transaction-timeout>60000</transaction-timeout>
<!--how often to scan for timedout transactions-->
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -63,17 +63,17 @@
private final Executor sessionExecutor;
private final int clientWindowSize;
-
+
private final int ackBatchSize;
private final Queue<ClientMessage> buffer = new LinkedList<ClientMessage>();
private final Runner runner = new Runner();
-
+
private File directory;
private ClientMessage currentChunkMessage;
-
+
private volatile Thread receiverThread;
private volatile Thread onMessageThread;
@@ -81,23 +81,23 @@
private volatile MessageHandler handler;
private volatile boolean closing;
-
+
private volatile boolean closed;
private volatile int creditsToSend;
private volatile Exception lastException;
-
+
private volatile int ackBytes;
-
+
private volatile ClientMessage lastAckedMessage;
-
+
// Constructors
// ---------------------------------------------------------------------------------
public ClientConsumerImpl(final ClientSessionInternal session,
final long id,
- final int clientWindowSize,
+ final int clientWindowSize,
final int ackBatchSize,
final Executor executor,
final Channel channel,
@@ -112,9 +112,9 @@
sessionExecutor = executor;
this.clientWindowSize = clientWindowSize;
-
+
this.ackBatchSize = ackBatchSize;
-
+
this.directory = directory;
}
@@ -148,16 +148,16 @@
while (true)
{
ClientMessage m = null;
-
+
synchronized (this)
- {
+ {
while ((m = buffer.poll()) == null && !closed && toWait > 0)
{
if (start == -1)
{
- start = System.currentTimeMillis();
+ start = System.currentTimeMillis();
}
-
+
try
{
wait(toWait);
@@ -165,16 +165,16 @@
catch (InterruptedException e)
{
}
-
+
if (m != null || closed)
{
break;
}
-
+
long now = System.currentTimeMillis();
-
+
toWait -= now - start;
-
+
start = now;
}
}
@@ -192,7 +192,7 @@
if (expired)
{
session.expire(id, m.getMessageID());
-
+
if (toWait > 0)
{
continue;
@@ -216,7 +216,7 @@
receiverThread = null;
}
}
-
+
public ClientMessage receive() throws MessagingException
{
return receive(0);
@@ -234,8 +234,8 @@
return handler;
}
- //Must be synchronized since messages may be arriving while handler is being set and might otherwise end
- //up not queueing enough executors - so messages get stranded
+ // Must be synchronized since messages may be arriving while handler is being set and might otherwise end
+ // up not queueing enough executors - so messages get stranded
public synchronized void setMessageHandler(final MessageHandler theHandler) throws MessagingException
{
checkClosed();
@@ -289,7 +289,7 @@
{
return directory != null;
}
-
+
public Exception getLastException()
{
return lastException;
@@ -310,9 +310,9 @@
// This is ok - we just ignore the message
return;
}
-
+
ClientMessage messageToHandle = message;
-
+
if (isFileConsumer())
{
messageToHandle = cloneAsFileMessage(message);
@@ -323,28 +323,29 @@
if (handler != null)
{
// Execute using executor
-
+
buffer.add(messageToHandle);
- queueExecutor();
+ queueExecutor();
}
else
- {
+ {
// Add it to the buffer
buffer.add(messageToHandle);
notify();
}
}
+
public void handleChunk(SessionSendChunkMessage chunk) throws Exception
{
if (closed)
{
return;
}
-
+
flowControl(chunk.getBody().length);
-
+
if (chunk.getHeader() != null)
{
// The Header only comes on the first message, so a buffer has to be created on the client
@@ -394,13 +395,13 @@
{
((ClientFileMessage)currentChunkMessage).closeChannel();
}
-
+
ClientMessage msgToSend = currentChunkMessage;
currentChunkMessage = null;
handleMessage(msgToSend);
- }
+ }
}
-
+
public void clear()
{
synchronized (this)
@@ -425,13 +426,13 @@
{
return creditsToSend;
}
-
+
public void acknowledge(final ClientMessage message) throws MessagingException
{
ackBytes += message.getEncodeSize();
-
+
if (ackBytes >= ackBatchSize)
- {
+ {
doAck(message);
}
else
@@ -439,15 +440,15 @@
lastAckedMessage = message;
}
}
-
+
public void flushAcks() throws MessagingException
{
if (lastAckedMessage != null)
- {
+ {
doAck(lastAckedMessage);
}
}
-
+
// Public7
// ---------------------------------------------------------------------------------------
@@ -524,7 +525,6 @@
// ordering. If we just added a Runnable with the message to the executor immediately as we get it
// we could not do that
-
ClientMessage message;
// Must store handler in local variable since might get set to null
@@ -566,19 +566,19 @@
{
return;
}
-
- //We need an extra flag closing, since we need to prevent any more messages getting queued to execute
- //after this and we can't just set the closed flag to true here, since after/in onmessage the message
- //might be acked and if the consumer is already closed, the ack will be ignored
+
+ // We need an extra flag closing, since we need to prevent any more messages getting queued to execute
+ // after this and we can't just set the closed flag to true here, since after/in onmessage the message
+ // might be acked and if the consumer is already closed, the ack will be ignored
closing = true;
-
+
// Now we wait for any current handler runners to run.
waitForOnMessageToComplete();
-
+
closed = true;
-
+
synchronized (this)
- {
+ {
if (receiverThread != null)
{
// Wake up any receive() thread that might be waiting
@@ -591,7 +591,7 @@
}
flushAcks();
-
+
if (sendCloseMessage)
{
channel.sendBlocking(new SessionConsumerCloseMessage(id));
@@ -602,45 +602,51 @@
session.removeConsumer(this);
}
}
-
+
private void doAck(final ClientMessage message) throws MessagingException
{
ackBytes = 0;
-
+
lastAckedMessage = null;
-
+
session.acknowledge(id, message.getMessageID());
}
-
+
private ClientFileMessage cloneAsFileMessage(final ClientMessage message) throws Exception
{
int propertiesSize = message.getPropertiesEncodeSize();
-
+
MessagingBuffer bufferProperties = message.getBody().createNewBuffer(propertiesSize);
- // FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the MessagingBuffer.
- // There is no direct access into the Properties, and I couldn't add a direct cast to this method without loose abstraction
+ // FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the
+ // MessagingBuffer.
+ // There is no direct access into the Properties, and I couldn't add a direct cast to this method without loose
+ // abstraction
message.encodeProperties(bufferProperties);
-
+
bufferProperties.rewind();
ClientFileMessageImpl cloneMessage = new ClientFileMessageImpl();
-
+
cloneMessage.decodeProperties(bufferProperties);
-
+
cloneMessage.setDeliveryCount(message.getDeliveryCount());
-
+
cloneMessage.setLargeMessage(message.isLargeMessage());
- cloneMessage.setFile(new File(this.directory, cloneMessage.getMessageID() + "-" + this.session.getName() + "-" + this.getID() + ".jbm"));
-
+ cloneMessage.setFile(new File(this.directory, cloneMessage.getMessageID() + "-" +
+ this.session.getName() +
+ "-" +
+ this.getID() +
+ ".jbm"));
+
addBytesBody(cloneMessage, message.getBody().array());
-
+
cloneMessage.closeChannel();
-
+
return cloneMessage;
}
-
+
private ClientMessage createFileMessage(final MessagingBuffer propertiesBuffer) throws Exception
{
if (isFileConsumer())
@@ -649,10 +655,14 @@
{
directory.mkdirs();
}
-
+
ClientFileMessageImpl message = new ClientFileMessageImpl();
message.decodeProperties(propertiesBuffer);
- message.setFile(new File(this.directory, message.getMessageID() + "-" + this.session.getName() + "-" + this.getID() + ".jbm"));
+ message.setFile(new File(this.directory, message.getMessageID() + "-" +
+ this.session.getName() +
+ "-" +
+ this.getID() +
+ ".jbm"));
message.setLargeMessage(true);
return message;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -51,6 +51,10 @@
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = "org.jboss.messaging.core.client.impl.RoundRobinConnectionLoadBalancingPolicy";
public static final long DEFAULT_PING_PERIOD = 5000;
+
+ //5 minutes - normally this should be much higher than ping period, this allows clients to re-attach on live
+ //or backup without fear of session having already been closed when connection times out.
+ public static final long DEFAULT_CONNECTION_TTL = 5 * 60000;
// Any message beyond this size is considered a large message (to be sent in chunks)
public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;
@@ -81,14 +85,14 @@
public static final long DEFAULT_DISCOVERY_INITIAL_WAIT = 10000;
- public static final boolean DEFAULT_RETRY_ON_FAILURE = true;
+ public static final long DEFAULT_RETRY_INTERVAL = 2000;
- public static final long DEFAULT_RETRY_INTERVAL = 5000;
-
public static final double DEFAULT_RETRY_INTERVAL_MULTIPLIER = 1d;
- public static final int DEFAULT_MAX_RETRIES = -1;
+ public static final int DEFAULT_MAX_RETRIES_BEFORE_FAILOVER = 0;
+ public static final int DEFAULT_MAX_RETRIES_AFTER_FAILOVER = 10;
+
// Attributes
// -----------------------------------------------------------------------------------
@@ -97,6 +101,8 @@
private ConnectionManager[] connectionManagerArray;
private final long pingPeriod;
+
+ private final long connectionTTL;
private final long callTimeout;
@@ -135,16 +141,16 @@
private final long initialWaitTimeout;
- //Reconnect params
-
- private final boolean retryOnFailure;
+ // Reconnect params
private final long retryInterval;
private final double retryIntervalMultiplier; // For exponential backoff
- private final int maxRetries;
-
+ private final int maxRetriesBeforeFailover;
+
+ private final int maxRetriesAfterFailover;
+
// Static
// ---------------------------------------------------------------------------------------
@@ -175,6 +181,7 @@
this.initialWaitTimeout = initialWaitTimeout;
this.loadBalancingPolicy = instantiateLoadBalancingPolicy(DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME);
this.pingPeriod = DEFAULT_PING_PERIOD;
+ this.connectionTTL = DEFAULT_CONNECTION_TTL;
this.callTimeout = DEFAULT_CALL_TIMEOUT;
this.consumerWindowSize = DEFAULT_CONSUMER_WINDOW_SIZE;
this.consumerMaxRate = DEFAULT_CONSUMER_MAX_RATE;
@@ -188,10 +195,10 @@
this.maxConnections = DEFAULT_MAX_CONNECTIONS;
this.ackBatchSize = DEFAULT_ACK_BATCH_SIZE;
this.preAcknowledge = DEFAULT_PRE_ACKNOWLEDGE;
- this.retryOnFailure = DEFAULT_RETRY_ON_FAILURE;
this.retryInterval = DEFAULT_RETRY_INTERVAL;
this.retryIntervalMultiplier = DEFAULT_RETRY_INTERVAL_MULTIPLIER;
- this.maxRetries = DEFAULT_MAX_RETRIES;
+ this.maxRetriesBeforeFailover = DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+ this.maxRetriesAfterFailover = DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
}
public ClientSessionFactoryImpl(final String discoveryGroupName,
@@ -200,6 +207,7 @@
final long initialWaitTimeout,
final String connectionloadBalancingPolicyClassName,
final long pingPeriod,
+ final long connectionTTL,
final long callTimeout,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -213,10 +221,10 @@
final int maxConnections,
final boolean preAcknowledge,
final int ackBatchSize,
- final boolean retryOnFailure,
final long retryInterval,
final double retryIntervalMultiplier,
- final int maxRetries) throws MessagingException
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover) throws MessagingException
{
try
{
@@ -237,6 +245,7 @@
this.initialWaitTimeout = initialWaitTimeout;
this.loadBalancingPolicy = instantiateLoadBalancingPolicy(connectionloadBalancingPolicyClassName);
this.pingPeriod = pingPeriod;
+ this.connectionTTL = connectionTTL;
this.callTimeout = callTimeout;
this.consumerWindowSize = consumerWindowSize;
this.consumerMaxRate = consumerMaxRate;
@@ -250,15 +259,16 @@
this.maxConnections = maxConnections;
this.ackBatchSize = ackBatchSize;
this.preAcknowledge = preAcknowledge;
- this.retryOnFailure = retryOnFailure;
this.retryInterval = retryInterval;
this.retryIntervalMultiplier = retryIntervalMultiplier;
- this.maxRetries = maxRetries;
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
}
public ClientSessionFactoryImpl(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors,
final String connectionloadBalancingPolicyClassName,
final long pingPeriod,
+ final long connectionTTL,
final long callTimeout,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -272,13 +282,14 @@
final int maxConnections,
final boolean preAcknowledge,
final int ackBatchSize,
- final boolean retryOnFailure,
final long retryInterval,
final double retryIntervalMultiplier,
- final int maxRetries)
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover)
{
this.loadBalancingPolicy = instantiateLoadBalancingPolicy(connectionloadBalancingPolicyClassName);
this.pingPeriod = pingPeriod;
+ this.connectionTTL = connectionTTL;
this.callTimeout = callTimeout;
this.consumerWindowSize = consumerWindowSize;
this.consumerMaxRate = consumerMaxRate;
@@ -292,10 +303,10 @@
this.maxConnections = maxConnections;
this.ackBatchSize = ackBatchSize;
this.preAcknowledge = preAcknowledge;
- this.retryOnFailure = retryOnFailure;
this.retryInterval = retryInterval;
this.retryIntervalMultiplier = retryIntervalMultiplier;
- this.maxRetries = maxRetries;
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
this.initialWaitTimeout = -1;
@@ -306,10 +317,11 @@
maxConnections,
callTimeout,
pingPeriod,
- retryOnFailure,
+ connectionTTL,
retryInterval,
retryIntervalMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
connectionManagerMap.put(pair, cm);
}
@@ -318,14 +330,17 @@
this.discoveryGroup = null;
}
-
- public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
+
+ public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
+ final TransportConfiguration backupConnectorConfig,
final long retryInterval,
final double retryIntervalMultiplier,
- final int maxRetries)
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover)
{
this.loadBalancingPolicy = new FirstElementConnectionLoadBalancingPolicy();
this.pingPeriod = DEFAULT_PING_PERIOD;
+ this.connectionTTL = DEFAULT_CONNECTION_TTL;
this.callTimeout = DEFAULT_CALL_TIMEOUT;
this.consumerWindowSize = DEFAULT_CONSUMER_WINDOW_SIZE;
this.consumerMaxRate = DEFAULT_CONSUMER_MAX_RATE;
@@ -339,14 +354,64 @@
this.maxConnections = DEFAULT_MAX_CONNECTIONS;
this.ackBatchSize = DEFAULT_ACK_BATCH_SIZE;
this.preAcknowledge = DEFAULT_PRE_ACKNOWLEDGE;
- this.retryOnFailure = true;
this.retryInterval = retryInterval;
this.retryIntervalMultiplier = retryIntervalMultiplier;
- this.maxRetries = maxRetries;
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
this.initialWaitTimeout = -1;
Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connectorConfig,
+ backupConnectorConfig);
+
+ ConnectionManager cm = new ConnectionManagerImpl(pair.a,
+ pair.b,
+ maxConnections,
+ callTimeout,
+ pingPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
+
+ connectionManagerMap.put(pair, cm);
+
+ updateConnectionManagerArray();
+
+ discoveryGroup = null;
+ }
+
+ public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover)
+ {
+ this.loadBalancingPolicy = new FirstElementConnectionLoadBalancingPolicy();
+ this.pingPeriod = DEFAULT_PING_PERIOD;
+ this.connectionTTL = DEFAULT_CONNECTION_TTL;
+ this.callTimeout = DEFAULT_CALL_TIMEOUT;
+ this.consumerWindowSize = DEFAULT_CONSUMER_WINDOW_SIZE;
+ this.consumerMaxRate = DEFAULT_CONSUMER_MAX_RATE;
+ this.sendWindowSize = DEFAULT_SEND_WINDOW_SIZE;
+ this.producerMaxRate = DEFAULT_PRODUCER_MAX_RATE;
+ this.blockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+ this.blockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+ this.blockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+ this.minLargeMessageSize = DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+ this.autoGroup = DEFAULT_AUTO_GROUP;
+ this.maxConnections = DEFAULT_MAX_CONNECTIONS;
+ this.ackBatchSize = DEFAULT_ACK_BATCH_SIZE;
+ this.preAcknowledge = DEFAULT_PRE_ACKNOWLEDGE;
+ this.retryInterval = retryInterval;
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
+
+ this.initialWaitTimeout = -1;
+
+ Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connectorConfig,
null);
ConnectionManager cm = new ConnectionManagerImpl(pair.a,
@@ -354,10 +419,11 @@
maxConnections,
callTimeout,
pingPeriod,
- retryOnFailure,
+ connectionTTL,
retryInterval,
retryIntervalMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
connectionManagerMap.put(pair, cm);
@@ -372,6 +438,7 @@
this.loadBalancingPolicy = new FirstElementConnectionLoadBalancingPolicy();
this.pingPeriod = DEFAULT_PING_PERIOD;
this.callTimeout = DEFAULT_CALL_TIMEOUT;
+ this.connectionTTL = DEFAULT_CONNECTION_TTL;
this.consumerWindowSize = DEFAULT_CONSUMER_WINDOW_SIZE;
this.consumerMaxRate = DEFAULT_CONSUMER_MAX_RATE;
this.sendWindowSize = DEFAULT_SEND_WINDOW_SIZE;
@@ -384,10 +451,10 @@
this.maxConnections = DEFAULT_MAX_CONNECTIONS;
this.ackBatchSize = DEFAULT_ACK_BATCH_SIZE;
this.preAcknowledge = DEFAULT_PRE_ACKNOWLEDGE;
- this.retryOnFailure = DEFAULT_RETRY_ON_FAILURE;
this.retryInterval = DEFAULT_RETRY_INTERVAL;
this.retryIntervalMultiplier = DEFAULT_RETRY_INTERVAL_MULTIPLIER;
- this.maxRetries = DEFAULT_MAX_RETRIES;
+ this.maxRetriesBeforeFailover = DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+ this.maxRetriesAfterFailover = DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
this.initialWaitTimeout = -1;
@@ -399,10 +466,11 @@
maxConnections,
callTimeout,
pingPeriod,
- retryOnFailure,
+ connectionTTL,
retryInterval,
retryIntervalMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
connectionManagerMap.put(pair, cm);
@@ -415,6 +483,7 @@
final TransportConfiguration backupConfig,
final String connectionloadBalancingPolicyClassName,
final long pingPeriod,
+ final long connectionTTL,
final long callTimeout,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -428,13 +497,14 @@
final int maxConnections,
final boolean preAcknowledge,
final int ackBatchSize,
- final boolean retryOnFailure,
final long retryInterval,
final double retryIntervalMultiplier,
- final int maxRetries)
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover)
{
this.loadBalancingPolicy = instantiateLoadBalancingPolicy(connectionloadBalancingPolicyClassName);
this.pingPeriod = pingPeriod;
+ this.connectionTTL = connectionTTL;
this.callTimeout = callTimeout;
this.consumerWindowSize = consumerWindowSize;
this.consumerMaxRate = consumerMaxRate;
@@ -448,25 +518,25 @@
this.maxConnections = maxConnections;
this.ackBatchSize = ackBatchSize;
this.preAcknowledge = preAcknowledge;
- this.retryOnFailure = retryOnFailure;
this.retryInterval = retryInterval;
this.retryIntervalMultiplier = retryIntervalMultiplier;
- this.maxRetries = maxRetries;
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
this.initialWaitTimeout = -1;
Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connectorConfig,
backupConfig);
-
ConnectionManager cm = new ConnectionManagerImpl(pair.a,
pair.b,
maxConnections,
callTimeout,
pingPeriod,
- retryOnFailure,
+ connectionTTL,
retryInterval,
retryIntervalMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
connectionManagerMap.put(pair, cm);
@@ -723,10 +793,11 @@
maxConnections,
callTimeout,
pingPeriod,
- retryOnFailure,
+ connectionTTL,
retryInterval,
retryIntervalMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
connectionManagerMap.put(connectorPair, connectionManager);
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -690,7 +690,7 @@
{
return;
}
-
+
// We lock the channel to prevent any packets to be added to the resend
// cache during the failover process
channel.lock();
@@ -734,7 +734,7 @@
channel.send(new SessionFailoverCompleteMessage(name));
// Now we can add a failure listener since if a further failure occurs we cleanup since no backup any more
- remotingConnection.addFailureListener(this);
+ //remotingConnection.addFailureListener(this);
}
// XAResource implementation
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -93,6 +93,8 @@
private final long callTimeout;
private final long pingPeriod;
+
+ private final long connectionTTL;
private final Map<ClientSessionInternal, RemotingConnection> sessions = new HashMap<ClientSessionInternal, RemotingConnection>();
@@ -116,14 +118,14 @@
private Object failConnectionLock = new Object();
- private final boolean retryOnFailure;
-
private final long retryInterval;
private final double retryIntervalMultiplier; // For exponential backoff
+
+ private final int maxRetriesBeforeFailover;
+
+ private final int maxRetriesAfterFailover;
- private final int maxRetries;
-
// Static
// ---------------------------------------------------------------------------------------
@@ -134,11 +136,12 @@
final TransportConfiguration backupConfig,
final int maxConnections,
final long callTimeout,
- final long pingPeriod,
- final boolean retryOnFailure,
+ final long pingPeriod,
+ final long connectionTTL,
final long retryInterval,
- final double retryIntervalMultiplier,
- final int maxRetries)
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover)
{
connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
@@ -163,13 +166,15 @@
this.pingPeriod = pingPeriod;
- this.retryOnFailure = retryOnFailure;
+ this.connectionTTL = connectionTTL;
this.retryInterval = retryInterval;
this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
- this.maxRetries = maxRetries;
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
}
// ConnectionLifeCycleListener implementation --------------------
@@ -275,7 +280,7 @@
// So we just need to return our connections and flag for retry
- this.returnConnection(connection.getID());
+ returnConnection(connection.getID());
retry = true;
}
@@ -391,7 +396,7 @@
// FailureListener implementation --------------------------------------------------------
public void connectionFailed(final MessagingException me)
- {
+ {
if (me.getCode() == MessagingException.OBJECT_CLOSED)
{
// The server has closed the connection. We don't want failover to occur in this case -
@@ -428,16 +433,9 @@
// It can then release the channel 1 lock, and retry (which will cause locking on failoverLock
// until failover is complete
- if (backupConnectorFactory != null || retryOnFailure)
+ if (backupConnectorFactory != null || maxRetriesBeforeFailover != 0 || maxRetriesAfterFailover != 0)
{
- if (backupConnectorFactory != null)
- {
- log.info("Commencing automatic failover");
- }
- else
- {
- log.info("Will attempt reconnection");
- }
+ log.info("Commencing automatic failover / reconnection");
lockAllChannel1s();
@@ -473,120 +471,164 @@
}
}
}
-
+
// Now we absolutely know that no threads are executing in or blocked in createSession, and no
// more will execute it until failover is complete
// So.. do failover / reconnection
-
+
Set<RemotingConnection> oldConnections = new HashSet<RemotingConnection>();
for (ConnectionEntry entry : connections.values())
{
oldConnections.add(entry.connection);
}
-
+
connections.clear();
refCount = 0;
mapIterator = null;
-
- if (backupConnectorFactory != null)
+
+ boolean done = false;
+
+ if (maxRetriesBeforeFailover != 0)
{
- connectorFactory = backupConnectorFactory;
-
- transportParams = backupTransportParams;
- }
-
- backupConnectorFactory = null;
-
- backupTransportParams = null;
-
- // We fail over sessions per connection to ensure there is the same mapping of channel id
- // on live and backup connections
-
- Map<RemotingConnection, List<ClientSessionInternal>> sessionsPerConnection = new HashMap<RemotingConnection, List<ClientSessionInternal>>();
-
- for (Map.Entry<ClientSessionInternal, RemotingConnection> entry : sessions.entrySet())
- {
- ClientSessionInternal session = entry.getKey();
-
- RemotingConnection connection = entry.getValue();
-
- List<ClientSessionInternal> sessions = sessionsPerConnection.get(connection);
-
- if (sessions == null)
+ //First try reconnecting to current node if configured to do this
+
+ done = reconnect(maxRetriesBeforeFailover);
+
+ if (done)
{
- sessions = new ArrayList<ClientSessionInternal>();
-
- sessionsPerConnection.put(connection, sessions);
+ log.info("reconnected to original node");
}
-
- sessions.add(session);
}
-
- boolean ok = true;
- for (Map.Entry<RemotingConnection, List<ClientSessionInternal>> entry : sessionsPerConnection.entrySet())
+ if (!done)
{
- List<ClientSessionInternal> theSessions = entry.getValue();
+ //If didn't reconnect to current node then try failover to backup
- RemotingConnection backupConnection = getConnectionWithRetry(theSessions);
-
- if (backupConnection == null)
+ int retries = maxRetriesAfterFailover;
+
+ if (backupConnectorFactory != null)
{
- log.warn("Failed to reconnect to server.");
+ connectorFactory = backupConnectorFactory;
+
+ transportParams = backupTransportParams;
- ok = false;
+ if (maxRetriesAfterFailover == 0)
+ {
+ retries = 1;
+ }
- break;
+ log.info("Failing over to backup");
}
+ else
+ {
+ log.info("Attempting reconnection");
+ }
+
+ backupConnectorFactory = null;
+
+ backupTransportParams = null;
- for (ClientSessionInternal session : theSessions)
+ done = reconnect(retries);
+
+ if (done)
{
- session.handleFailover(backupConnection);
-
- sessions.put(session, backupConnection);
+ log.info("Successfully reconnected");
}
}
for (RemotingConnection connection : oldConnections)
{
connection.destroy();
- }
+ }
+ }
+ }
+ }
- if (ok)
- {
- log.info("Failover complete");
- }
+ private boolean reconnect(final int retries)
+ {
+ // We fail over sessions per connection to ensure there is the same mapping of channel id
+ // on live and backup connections
+
+ Map<RemotingConnection, List<ClientSessionInternal>> sessionsPerConnection = new HashMap<RemotingConnection, List<ClientSessionInternal>>();
+
+ for (Map.Entry<ClientSessionInternal, RemotingConnection> entry : sessions.entrySet())
+ {
+ ClientSessionInternal session = entry.getKey();
+
+ RemotingConnection connection = entry.getValue();
+
+ List<ClientSessionInternal> sessions = sessionsPerConnection.get(connection);
+
+ if (sessions == null)
+ {
+ sessions = new ArrayList<ClientSessionInternal>();
+
+ sessionsPerConnection.put(connection, sessions);
}
+
+ sessions.add(session);
}
+
+ boolean ok = true;
+
+ for (Map.Entry<RemotingConnection, List<ClientSessionInternal>> entry : sessionsPerConnection.entrySet())
+ {
+ List<ClientSessionInternal> theSessions = entry.getValue();
+
+ RemotingConnection backupConnection = getConnectionWithRetry(theSessions, retries);
+
+ if (backupConnection == null)
+ {
+ log.warn("Failed to reconnect to server.");
+
+ ok = false;
+
+ break;
+ }
+
+ backupConnection.addFailureListener(this);
+
+ for (ClientSessionInternal session : theSessions)
+ {
+ sessions.put(session, backupConnection);
+ }
+ }
+
+ if (ok)
+ {
+ //If all connections got ok, then handle failover
+ for (Map.Entry<ClientSessionInternal, RemotingConnection> entry: sessions.entrySet())
+ {
+ entry.getKey().handleFailover(entry.getValue());
+ }
+ }
+
+ return ok;
}
-
- private RemotingConnection getConnectionWithRetry(final List<ClientSessionInternal> sessions)
+
+ private RemotingConnection getConnectionWithRetry(final List<ClientSessionInternal> sessions, final int retries)
{
long interval = retryInterval;
int count = 0;
- log.info("Getting connection with retry");
-
while (true)
{
RemotingConnection connection = getConnection(sessions.size());
-
- log.info("Got connection " + connection);
-
+
if (connection == null)
{
//Failed to get backup connection
- if (retryOnFailure)
+ if (retries != 0)
{
- if (maxRetries != -1 && count == maxRetries)
+ if (retries != -1 && count == retries)
{
- log.warn("Retried " + maxRetries + " times to reconnect. Now giving up.");
+ log.warn("Retried " + retries + " times to reconnect. Now giving up.");
return null;
}
@@ -676,7 +718,7 @@
return null;
}
- conn = new RemotingConnectionImpl(tc, callTimeout, pingPeriod, pingExecutor, null);
+ conn = new RemotingConnectionImpl(tc, callTimeout, pingPeriod, connectionTTL, pingExecutor, null);
handler.conn = conn;
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -81,9 +81,13 @@
void setInterceptorClassNames(List<String> interceptors);
long getConnectionScanPeriod();
-
+
void setConnectionScanPeriod(long scanPeriod);
+
+ long getConnectionTTLOverride();
+ void setConnectionTTLOverride(long ttl);
+
Set<TransportConfiguration> getAcceptorConfigurations();
void setAcceptorConfigurations(Set<TransportConfiguration> infos);
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -54,6 +54,8 @@
public static final boolean DEFAULT_JMX_MANAGEMENT_ENABLED = true;
public static final long DEFAULT_CONNECTION_SCAN_PERIOD = 1000;
+
+ public static final long DEFAULT_CONNECTION_TTL_OVERRIDE = -1;
public static final String DEFAULT_BINDINGS_DIRECTORY = "data/bindings";
@@ -126,6 +128,8 @@
protected boolean jmxManagementEnabled = DEFAULT_JMX_MANAGEMENT_ENABLED;
protected long connectionScanPeriod = DEFAULT_CONNECTION_SCAN_PERIOD;
+
+ protected long connectionTTLOverride = DEFAULT_CONNECTION_TTL_OVERRIDE;
protected long messageExpiryScanPeriod = DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD;
@@ -261,7 +265,17 @@
{
connectionScanPeriod = scanPeriod;
}
+
+ public long getConnectionTTLOverride()
+ {
+ return connectionTTLOverride;
+ }
+ public void setConnectionTTLOverride(final long ttl)
+ {
+ this.connectionTTLOverride = ttl;
+ }
+
public List<String> getInterceptorClassNames()
{
return interceptorClassNames;
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -94,6 +94,8 @@
securityInvalidationInterval = getLong(e, "security-invalidation-interval", securityInvalidationInterval);
connectionScanPeriod = getLong(e, "connection-scan-period", connectionScanPeriod);
+
+ connectionTTLOverride = getLong(e, "connection-ttl-override", connectionTTLOverride);
transactionTimeout = getLong(e, "transaction-timeout", transactionTimeout);
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -90,6 +90,7 @@
// Bytes consumed by the queue on the memory
private final AtomicLong sizeInBytes = new AtomicLong();
+ //FIXME - don't call this a thread - it's a Runnable not a Thread
private volatile Runnable dequeueThread;
private volatile int numberOfPages;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -39,7 +39,7 @@
MessagingBuffer createBuffer(int size);
void fail(MessagingException me);
-
+
void destroy();
boolean isExpired(long now);
@@ -55,4 +55,6 @@
void freeze();
RemotingConnection getReplicatingConnection();
+
+ void setReplicatingConnection(RemotingConnection connection);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -151,8 +151,6 @@
private static final Logger log = Logger.getLogger(RemotingConnectionImpl.class);
- private static final float EXPIRE_FACTOR = 1.5f;
-
// Static
// ---------------------------------------------------------------------------------------
@@ -160,6 +158,7 @@
final Map<String, Object> params,
final long callTimeout,
final long pingInterval,
+ final long connectionTTL,
final ScheduledExecutorService pingExecutor,
final ConnectionLifeCycleListener listener)
{
@@ -176,7 +175,12 @@
throw new IllegalStateException("Failed to connect");
}
- RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null);
+ RemotingConnection connection = new RemotingConnectionImpl(tc,
+ callTimeout,
+ pingInterval,
+ connectionTTL,
+ pingExecutor,
+ null);
handler.conn = connection;
@@ -216,15 +220,13 @@
private volatile boolean destroyed;
- private long expirePeriod;
-
private volatile boolean stopPinging;
private volatile long expireTime = -1;
private final Channel pingChannel;
- private final RemotingConnection replicatingConnection;
+ private volatile RemotingConnection replicatingConnection;
private volatile boolean active;
@@ -232,6 +234,9 @@
private final long pingPeriod;
+ // How long without a ping before the connection times out
+ private final long connectionTTL;
+
private final ScheduledExecutorService pingExecutor;
// Channels 0-9 are reserved for the system
@@ -262,10 +267,19 @@
public RemotingConnectionImpl(final Connection transportConnection,
final long blockingCallTimeout,
final long pingPeriod,
+ final long connectionTTL,
final ScheduledExecutorService pingExecutor,
final List<Interceptor> interceptors)
{
- this(transportConnection, blockingCallTimeout, pingPeriod, pingExecutor, interceptors, null, true, true);
+ this(transportConnection,
+ blockingCallTimeout,
+ pingPeriod,
+ connectionTTL,
+ pingExecutor,
+ interceptors,
+ null,
+ true,
+ true);
}
/*
@@ -274,15 +288,17 @@
public RemotingConnectionImpl(final Connection transportConnection,
final List<Interceptor> interceptors,
final RemotingConnection replicatingConnection,
- final boolean active)
+ final boolean active,
+ final long connectionTTL)
{
- this(transportConnection, -1, -1, null, interceptors, replicatingConnection, active, false);
+ this(transportConnection, -1, -1, connectionTTL, null, interceptors, replicatingConnection, active, false);
}
private RemotingConnectionImpl(final Connection transportConnection,
final long blockingCallTimeout,
final long pingPeriod,
+ final long connectionTTL,
final ScheduledExecutorService pingExecutor,
final List<Interceptor> interceptors,
final RemotingConnection replicatingConnection,
@@ -307,6 +323,8 @@
this.pingPeriod = pingPeriod;
+ this.connectionTTL = connectionTTL;
+
this.pingExecutor = pingExecutor;
// Channel zero is reserved for pinging
@@ -325,8 +343,6 @@
{
pinger = new Pinger();
- expirePeriod = (long)(EXPIRE_FACTOR * pingPeriod);
-
future = pingExecutor.scheduleWithFixedDelay(pinger, 0, pingPeriod, TimeUnit.MILLISECONDS);
}
else
@@ -391,6 +407,11 @@
{
return replicatingConnection;
}
+
+ public void setReplicatingConnection(final RemotingConnection connection)
+ {
+ this.replicatingConnection = connection;
+ }
/*
* This can be called concurrently by more than one thread so needs to be locked
@@ -1005,11 +1026,11 @@
}
public Packet sendBlocking(final Packet packet) throws MessagingException
- {
+ {
// System.identityHashCode(this.connection) + " " + packet.getType());
if (closed)
- {
+ {
throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection is destroyed");
}
@@ -1042,7 +1063,7 @@
}
lock.lock();
-
+
try
{
while (failingOver)
@@ -1120,8 +1141,10 @@
if (replicatingChannel != null)
{
DelayedResult result = new DelayedResult();
-
+
responseActions.add(result);
+
+ responseActionCount++;
replicatingChannel.send(packet);
@@ -1156,6 +1179,8 @@
break;
}
}
+
+ responseActionCount = 0;
}
}
@@ -1175,6 +1200,8 @@
// This will never get called concurrently by more than one thread
+ private int responseActionCount;
+
// TODO it's not ideal synchronizing this since it forms a contention point with replication
// but we need to do this to protect it w.r.t. the check on replicatingChannel
public void replicateResponseReceived()
@@ -1190,7 +1217,7 @@
if (result == null)
{
throw new IllegalStateException("Cannot find response action");
- }
+ }
}
}
@@ -1198,16 +1225,62 @@
if (result != null)
{
result.replicated();
+
+ //TODO - we can optimise this not to lock every time - only if waiting for all replications to return
+ synchronized (replicationLock)
+ {
+ responseActionCount--;
+
+ if (responseActionCount == 0)
+ {
+ replicationLock.notify();
+ }
+ }
}
}
+
+ private void waitForAllReplicationResponse()
+ {
+ synchronized (replicationLock)
+ {
+ if (replicatingChannel != null)
+ {
+ long toWait = 10000; // TODO don't hardcode timeout
+
+ long start = System.currentTimeMillis();
+
+ while (responseActionCount > 0 && toWait > 0)
+ {
+ try
+ {
+ replicationLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (toWait <= 0)
+ {
+ log.warn("Timed out waiting for replication responses to return");
+ }
+ }
+ }
+ }
+
public void setHandler(final ChannelHandler handler)
{
this.handler = handler;
}
public void close()
- {
+ {
if (closed)
{
return;
@@ -1238,10 +1311,17 @@
public void transferConnection(final RemotingConnection newConnection)
{
// Needs to synchronize on the connection to make sure no packets from
- // the old connection get processed after transfer has occurred
+ // the old connection get processed after transfer has occurred
synchronized (connection.transferLock)
{
connection.channels.remove(id);
+
+ //If we're reconnecting to a live node which is replicated then there will be a replicating channel
+ //too. We need to then make sure that all replication responses come back since packets aren't
+ //considered confirmed until response comes back and is processed. Otherwise responses to previous
+ //message sends could come back after reconnection resulting in clients resending same message
+ //since it wasn't confirmed yet.
+ waitForAllReplicationResponse();
// And switch it
@@ -1255,7 +1335,7 @@
rnewConnection.channels.put(id, this);
connection = rnewConnection;
- }
+ }
}
public void replayCommands(final int otherLastReceivedCommandID)
@@ -1263,7 +1343,7 @@
clearUpTo(otherLastReceivedCommandID);
for (final Packet packet : resendCache)
- {
+ {
doWrite(packet);
}
}
@@ -1482,7 +1562,7 @@
firstTime = false;
// Send ping
- final Packet ping = new Ping(expirePeriod);
+ final Packet ping = new Ping(connectionTTL);
pingChannel.send(ping);
}
@@ -1505,8 +1585,12 @@
}
else if (type == PING)
{
- expireTime = System.currentTimeMillis() + ((Ping)packet).getExpirePeriod();
+ // connectionTTL if specified on the server overrides any value specified in the ping.
+ long connectionTTLToUse = connectionTTL != -1 ? connectionTTL : ((Ping)packet).getExpirePeriod();
+
+ expireTime = System.currentTimeMillis() + connectionTTLToUse;
+
// Parameter is placeholder for future
final Packet pong = new Pong(-1);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -68,6 +68,8 @@
private TimerTask failedConnectionsTask;
private final long connectionScanPeriod;
+
+ private final long connectionTTL;
private final BufferHandler bufferHandler = new DelegatingBufferHandler();
@@ -98,6 +100,8 @@
}
connectionScanPeriod = config.getConnectionScanPeriod();
+
+ connectionTTL = config.getConnectionTTLOverride();
backup = config.isBackup();
}
@@ -215,7 +219,8 @@
RemotingConnection rc = new RemotingConnectionImpl(connection,
interceptors,
replicatingConnection,
- !backup);
+ !backup,
+ connectionTTL);
Channel channel1 = rc.getChannel(1, -1, false);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionFailoverCompleteMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionFailoverCompleteMessage.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionFailoverCompleteMessage.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -71,6 +71,11 @@
{
name = buffer.getString();
}
+
+ public boolean isRequiresConfirmations()
+ {
+ return false;
+ }
public boolean equals(Object other)
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -25,6 +25,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
@@ -136,8 +137,6 @@
private ManagementService managementService;
-
-
// Constructors
// ---------------------------------------------------------------------------------
@@ -610,11 +609,13 @@
if (backupConnectorFactory != null)
{
NoCacheConnectionLifeCycleListener listener = new NoCacheConnectionLifeCycleListener();
+
RemotingConnectionImpl replicatingConnection = (RemotingConnectionImpl)RemotingConnectionImpl.createConnection(backupConnectorFactory,
backupConnectorParams,
- 30000,
- 5000,
- this.scheduledExecutor,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+ scheduledExecutor,
listener);
listener.conn = replicatingConnection;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -163,7 +163,7 @@
}
public HandleStatus addLast(final MessageReference ref)
- {
+ {
HandleStatus status = add(ref, false);
return status;
@@ -727,7 +727,7 @@
// because it's async and could get out of step
// with the live node. Instead, when we replicate the delivery we remove
// the ref from the queue
-
+
if (backup)
{
return;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -180,15 +180,15 @@
// Otherwise we could end up with a situation where a close comes in, then a delivery comes in,
// then close gets replicated to backup, then delivery gets replicated, but consumer is already
// closed!
- lock.lock();
- try
- {
+// lock.lock();
+// try
+// {
setStarted(false);
- }
- finally
- {
- lock.unlock();
- }
+// }
+// finally
+// {
+// lock.unlock();
+// }
DelayedResult result = channel.replicatePacket(packet);
@@ -239,15 +239,15 @@
public void close() throws Exception
{
- lock.lock();
- try
- {
+// lock.lock();
+// try
+// {
setStarted(false);
- }
- finally
- {
- lock.unlock();
- }
+// }
+// finally
+// {
+// lock.unlock();
+// }
doClose();
}
@@ -299,7 +299,15 @@
public void setStarted(final boolean started)
{
- this.started = browseOnly || started;
+ lock.lock();
+ try
+ {
+ this.started = browseOnly || started;
+ }
+ finally
+ {
+ lock.unlock();
+ }
// Outside the lock
if (started)
@@ -537,7 +545,6 @@
{
deliveringRefs.add(ref);
}
-
if (message instanceof ServerLargeMessage)
{
@@ -582,6 +589,7 @@
if (result == null)
{
// Not replicated - just send now
+
channel.send(packet);
}
else
@@ -590,7 +598,7 @@
result.setResultRunner(new Runnable()
{
public void run()
- {
+ {
channel.send(packet);
}
});
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -1956,6 +1956,7 @@
result = channel.replicatePacket(packet);
// note we process start before response is back from the backup
+
setStarted(true);
}
finally
@@ -2033,7 +2034,6 @@
public void handleFailedOver(final Packet packet)
{
- // No need to replicate since this only occurs after failover
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
for (ServerConsumer consumer : consumersClone)
@@ -2246,6 +2246,11 @@
msg.setMessageID(id);
}
+
+ if (channel.getReplicatingChannel() != null)
+ {
+ msg.putBooleanProperty(new SimpleString("clustered"), true);
+ }
DelayedResult result = channel.replicatePacket(packet);
@@ -2351,9 +2356,27 @@
public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
{
+ boolean wasStarted = this.started;
+
+ if (wasStarted)
+ {
+ this.setStarted(false);
+ }
+
remotingConnection.removeFailureListener(this);
channel.transferConnection(newConnection);
+
+ RemotingConnection oldReplicatingConnection = newConnection.getReplicatingConnection();
+
+ if (oldReplicatingConnection != null)
+ {
+ oldReplicatingConnection.destroy();
+ }
+
+ newConnection.setReplicatingConnection(remotingConnection.getReplicatingConnection());
+
+ remotingConnection.setReplicatingConnection(null);
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
@@ -2367,7 +2390,12 @@
int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
channel.replayCommands(lastReceivedCommandID);
-
+
+ if (wasStarted)
+ {
+ this.setStarted(true);
+ }
+
return serverLastReceivedCommandID;
}
@@ -2383,7 +2411,8 @@
{
try
{
- log.info("Connection failed, so clearing up resources for session " + name);
+ log.info("Connection timed out, so clearing up resources for session " + name);
+
for (Runnable runner : failureRunners)
{
try
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -88,6 +88,8 @@
private final int transactionBatchSize;
private final long pingPeriod;
+
+ private final long connectionTTL;
private final long callTimeout;
@@ -113,15 +115,14 @@
private final boolean preAcknowledge;
-
- private final boolean retryOnFailure;
-
private final long retryInterval;
private final double retryIntervalMultiplier; // For exponential backoff
+
+ private final int maxRetriesBeforeFailover;
+
+ private final int maxRetriesAfterFailover;
- private final int maxRetries;
-
// Constructors ---------------------------------------------------------------------------------
public JBossConnectionFactory(final String discoveryGroupAddress,
@@ -130,6 +131,7 @@
final long discoveryInitialWaitTimeout,
final String loadBalancingPolicyClassName,
final long pingPeriod,
+ final long connectionTTL,
final long callTimeout,
final String clientID,
final int dupsOKBatchSize,
@@ -145,10 +147,10 @@
final boolean autoGroup,
final int maxConnections,
final boolean preAcknowledge,
- final boolean retryOnFailure,
final long retryInterval,
- final double retryIntervalMultiplier,
- final int maxRetries)
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover)
{
this.connectorConfigs = null;
this.discoveryGroupAddress = discoveryGroupAddress;
@@ -160,6 +162,7 @@
this.dupsOKBatchSize = dupsOKBatchSize;
this.transactionBatchSize = transactionBatchSize;
this.pingPeriod = pingPeriod;
+ this.connectionTTL = connectionTTL;
this.callTimeout = callTimeout;
this.consumerMaxRate = consumerMaxRate;
this.consumerWindowSize = consumerWindowSize;
@@ -172,10 +175,10 @@
this.autoGroup = autoGroup;
this.maxConnections = maxConnections;
this.preAcknowledge = preAcknowledge;
- this.retryOnFailure = retryOnFailure;
this.retryInterval = retryInterval;
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- this.maxRetries = maxRetries;
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
}
public JBossConnectionFactory(final String discoveryGroupAddress,
@@ -193,6 +196,7 @@
this.dupsOKBatchSize = ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
this.transactionBatchSize = ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
this.pingPeriod = ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
+ this.connectionTTL = ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
this.callTimeout = ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
this.consumerMaxRate = ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
this.consumerWindowSize = ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
@@ -204,11 +208,11 @@
this.blockOnPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
this.autoGroup = ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
this.maxConnections = ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
- this.preAcknowledge = ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
- this.retryOnFailure = ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
+ this.preAcknowledge = ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
this.retryInterval = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
- this.retryIntervalMultiplier = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
- this.maxRetries = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+ this.retryIntervalMultiplier = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+ this.maxRetriesBeforeFailover = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+ this.maxRetriesAfterFailover = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
}
public JBossConnectionFactory(final String discoveryGroupName, final int discoveryGroupPort)
@@ -222,6 +226,7 @@
public JBossConnectionFactory(final List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
final String loadBalancingPolicyClassName,
final long pingPeriod,
+ final long connectionTTL,
final long callTimeout,
final String clientID,
final int dupsOKBatchSize,
@@ -237,10 +242,10 @@
final boolean autoGroup,
final int maxConnections,
final boolean preAcknowledge,
- final boolean retryOnFailure,
final long retryInterval,
- final double retryIntervalMultiplier,
- final int maxRetries)
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover)
{
this.discoveryGroupAddress = null;
this.discoveryGroupPort = -1;
@@ -252,6 +257,7 @@
this.dupsOKBatchSize = dupsOKBatchSize;
this.transactionBatchSize = transactionBatchSize;
this.pingPeriod = pingPeriod;
+ this.connectionTTL = connectionTTL;
this.callTimeout = callTimeout;
this.consumerMaxRate = consumerMaxRate;
this.consumerWindowSize = consumerWindowSize;
@@ -264,16 +270,17 @@
this.autoGroup = autoGroup;
this.maxConnections = maxConnections;
this.preAcknowledge = preAcknowledge;
- this.retryOnFailure = retryOnFailure;
this.retryInterval = retryInterval;
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- this.maxRetries = maxRetries;
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
}
public JBossConnectionFactory(final TransportConfiguration transportConfig,
final TransportConfiguration backupConfig,
final String loadBalancingPolicyClassName,
final long pingPeriod,
+ final long connectionTTL,
final long callTimeout,
final String clientID,
final int dupsOKBatchSize,
@@ -288,11 +295,11 @@
final boolean blockOnPersistentSend,
final boolean autoGroup,
final int maxConnections,
- final boolean preAcknowledge,
- final boolean retryOnFailure,
+ final boolean preAcknowledge,
final long retryInterval,
- final double retryIntervalMultiplier,
- final int maxRetries)
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover)
{
this.discoveryGroupAddress = null;
this.discoveryGroupPort = -1;
@@ -306,6 +313,7 @@
this.dupsOKBatchSize = dupsOKBatchSize;
this.transactionBatchSize = transactionBatchSize;
this.pingPeriod = pingPeriod;
+ this.connectionTTL = connectionTTL;
this.callTimeout = callTimeout;
this.consumerMaxRate = consumerMaxRate;
this.consumerWindowSize = consumerWindowSize;
@@ -318,10 +326,10 @@
this.autoGroup = autoGroup;
this.maxConnections = maxConnections;
this.preAcknowledge = preAcknowledge;
- this.retryOnFailure = retryOnFailure;
this.retryInterval = retryInterval;
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- this.maxRetries = maxRetries;
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
}
@@ -337,6 +345,7 @@
this.dupsOKBatchSize = ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
this.transactionBatchSize = ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
this.pingPeriod = ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
+ this.connectionTTL = ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
this.callTimeout = ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
this.consumerMaxRate = ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
this.consumerWindowSize = ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
@@ -349,10 +358,10 @@
this.autoGroup = ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
this.maxConnections = ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
this.preAcknowledge = ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
- this.retryOnFailure = ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
this.retryInterval = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
- this.retryIntervalMultiplier = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
- this.maxRetries = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+ this.retryIntervalMultiplier = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+ this.maxRetriesBeforeFailover = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+ this.maxRetriesAfterFailover = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
}
public JBossConnectionFactory(final TransportConfiguration connectorConfig)
@@ -371,6 +380,7 @@
this.dupsOKBatchSize = ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
this.transactionBatchSize = ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
this.pingPeriod = ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
+ this.connectionTTL = ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
this.callTimeout = ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
this.consumerMaxRate = ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
this.consumerWindowSize = ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
@@ -383,10 +393,10 @@
this.autoGroup = ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
this.maxConnections = ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
this.preAcknowledge = ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
- this.retryOnFailure = ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
this.retryInterval = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
- this.retryIntervalMultiplier = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
- this.maxRetries = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+ this.retryIntervalMultiplier = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+ this.maxRetriesBeforeFailover = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+ this.maxRetriesAfterFailover = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
}
// ConnectionFactory implementation -------------------------------------------------------------
@@ -551,6 +561,7 @@
sessionFactory = new ClientSessionFactoryImpl(connectorConfigs,
connectionLoadBalancingPolicyClassName,
pingPeriod,
+ connectionTTL,
callTimeout,
consumerWindowSize,
consumerMaxRate,
@@ -563,11 +574,11 @@
autoGroup,
maxConnections,
preAcknowledge,
- dupsOKBatchSize,
- retryOnFailure,
+ dupsOKBatchSize,
retryInterval,
- retryIntervalMultiplier,
- maxRetries);
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
}
else
{
@@ -577,6 +588,7 @@
discoveryInitialWaitTimeout,
connectionLoadBalancingPolicyClassName,
pingPeriod,
+ connectionTTL,
callTimeout,
consumerWindowSize,
consumerMaxRate,
@@ -589,11 +601,11 @@
autoGroup,
maxConnections,
preAcknowledge,
- dupsOKBatchSize,
- retryOnFailure,
+ dupsOKBatchSize,
retryInterval,
retryIntervalMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
}
}
Modified: trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -125,12 +125,7 @@
if (!consumer.isClosed() && !this.transactedOrClientAck)
{
message.acknowledge();
- }
-
- if (consumer.isClosed())
- {
- log.info("not acking, consumer is closed");
- }
+ }
}
catch (MessagingException e)
{
Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -111,6 +111,7 @@
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
String connectionLoadBalancingPolicyClassName,
long pingPeriod,
+ long connectionTTL,
long callTimeout,
String clientID,
int dupsOKBatchSize,
@@ -125,11 +126,11 @@
boolean blockOnPersistentSend,
boolean autoGroup,
int maxConnections,
- boolean preAcknowledge,
- final boolean retryOnFailure,
+ boolean preAcknowledge,
final long retryInterval,
- final double retryIntervalMultiplier,
- final int maxRetries,
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover,
List<String> jndiBindings) throws Exception;
boolean createConnectionFactory(String name,
@@ -137,6 +138,7 @@
long discoveryInitialWait,
String connectionLoadBalancingPolicyClassName,
long pingPeriod,
+ long connectionTTL,
long callTimeout,
String clientID,
int dupsOKBatchSize,
@@ -151,11 +153,11 @@
boolean blockOnPersistentSend,
boolean autoGroup,
int maxConnections,
- boolean preAcknowledge,
- final boolean retryOnFailure,
+ boolean preAcknowledge,
final long retryInterval,
- final double retryIntervalMultiplier,
- final int maxRetries,
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover,
List<String> jndiBindings) throws Exception;
/**
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -43,6 +43,8 @@
private static final String CLIENTID_ELEMENT = "client-id";
private static final String PING_PERIOD_ELEMENT = "ping-period";
+
+ private static final String CONNECTION_TTL_ELEMENT = "connection-ttl";
private static final String CALL_TIMEOUT_ELEMENT = "call-timeout";
@@ -78,7 +80,9 @@
private static final String RETRY_INTERVAL_MULTIPLIER = "retry-interval-multiplier";
- private static final String MAX_RETRIES = "max-retries";
+ private static final String MAX_RETRIES_BEFORE_FAILOVER = "max-retries-before-failover";
+
+ private static final String MAX_RETRIES_AFTER_FAILOVER = "max-retries-after-failover";
private static final String CONNECTOR_LINK_ELEMENT = "connector-ref";
@@ -144,6 +148,7 @@
NodeList children = node.getChildNodes();
long pingPeriod = ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
+ long connectionTTL = ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
long callTimeout = ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
String clientID = null;
int dupsOKBatchSize = ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
@@ -159,10 +164,10 @@
boolean autoGroup = ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
int maxConnections = ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
boolean preAcknowledge = ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
- boolean retryOnFailure = ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
long retryInterval = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
- double retryIntervalMultiplier = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
- int maxRetries = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+ double retryIntervalMultiplier = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+ int maxRetriesBeforeFailover = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+ int maxRetriesAfterFailover = ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
List<String> jndiBindings = new ArrayList<String>();
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
@@ -178,6 +183,10 @@
{
pingPeriod = XMLUtil.parseLong(child);
}
+ else if (CONNECTION_TTL_ELEMENT.equals(child.getNodeName()))
+ {
+ connectionTTL = XMLUtil.parseLong(child);
+ }
else if (CALL_TIMEOUT_ELEMENT.equals(child.getNodeName()))
{
callTimeout = XMLUtil.parseLong(child);
@@ -250,10 +259,14 @@
{
retryIntervalMultiplier = XMLUtil.parseDouble(child);
}
- else if (MAX_RETRIES.equals(child.getNodeName()))
+ else if (MAX_RETRIES_BEFORE_FAILOVER.equals(child.getNodeName()))
{
- maxRetries = XMLUtil.parseInt(child);;
+ maxRetriesBeforeFailover = XMLUtil.parseInt(child);;
}
+ else if (MAX_RETRIES_AFTER_FAILOVER.equals(child.getNodeName()))
+ {
+ maxRetriesAfterFailover = XMLUtil.parseInt(child);;
+ }
else if (ENTRY_NODE_NAME.equals(child.getNodeName()))
{
String jndiName = child.getAttributes().getNamedItem("name").getNodeValue();
@@ -325,6 +338,7 @@
discoveryInitialWait,
connectionLoadBalancingPolicyClassName,
pingPeriod,
+ connectionTTL,
callTimeout,
clientID,
dupsOKBatchSize,
@@ -339,11 +353,11 @@
blockOnPersistentSend,
autoGroup,
maxConnections,
- preAcknowledge,
- retryOnFailure,
+ preAcknowledge,
retryInterval,
- retryIntervalMultiplier,
- maxRetries,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover,
jndiBindings);
}
else
@@ -352,6 +366,7 @@
connectorConfigs,
connectionLoadBalancingPolicyClassName,
pingPeriod,
+ connectionTTL,
callTimeout,
clientID,
dupsOKBatchSize,
@@ -366,11 +381,11 @@
blockOnPersistentSend,
autoGroup,
maxConnections,
- preAcknowledge,
- retryOnFailure,
+ preAcknowledge,
retryInterval,
- retryIntervalMultiplier,
- maxRetries,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover,
jndiBindings);
}
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -216,6 +216,7 @@
final List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
final String connectionLoadBalancingPolicyClassName,
final long pingPeriod,
+ final long connectionTTL,
final long callTimeout,
final String clientID,
final int dupsOKBatchSize,
@@ -230,11 +231,11 @@
final boolean blockOnPersistentSend,
final boolean autoGroup,
final int maxConnections,
- final boolean preAcknowledge,
- final boolean retryOnFailure,
+ final boolean preAcknowledge,
final long retryInterval,
- final double retryIntervalMultiplier,
- final int maxRetries,
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover,
final List<String> jndiBindings) throws Exception
{
JBossConnectionFactory cf = connectionFactories.get(name);
@@ -243,6 +244,7 @@
cf = new JBossConnectionFactory(connectorConfigs,
connectionLoadBalancingPolicyClassName,
pingPeriod,
+ connectionTTL,
callTimeout,
clientID,
dupsOKBatchSize,
@@ -257,11 +259,11 @@
blockOnPersistentSend,
autoGroup,
maxConnections,
- preAcknowledge,
- retryOnFailure,
+ preAcknowledge,
retryInterval,
- retryIntervalMultiplier,
- maxRetries);
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
}
bindConnectionFactory(cf, name, jndiBindings);
@@ -274,6 +276,7 @@
final long discoveryInitialWait,
final String connectionLoadBalancingPolicyClassName,
final long pingPeriod,
+ final long connectionTTL,
final long callTimeout,
final String clientID,
final int dupsOKBatchSize,
@@ -288,11 +291,11 @@
final boolean blockOnPersistentSend,
final boolean autoGroup,
final int maxConnections,
- final boolean preAcknowledge,
- final boolean retryOnFailure,
+ final boolean preAcknowledge,
final long retryInterval,
- final double retryIntervalMultiplier,
- final int maxRetries,
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover,
final List<String> jndiBindings) throws Exception
{
JBossConnectionFactory cf = connectionFactories.get(name);
@@ -304,6 +307,7 @@
discoveryInitialWait,
connectionLoadBalancingPolicyClassName,
pingPeriod,
+ connectionTTL,
callTimeout,
clientID,
dupsOKBatchSize,
@@ -318,11 +322,11 @@
blockOnPersistentSend,
autoGroup,
maxConnections,
- preAcknowledge,
- retryOnFailure,
+ preAcknowledge,
retryInterval,
- retryIntervalMultiplier,
- maxRetries);
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
}
bindConnectionFactory(cf, name, jndiBindings);
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -75,9 +75,11 @@
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
@Parameter(name = "connectionLoadBalancingPolicyClassName", desc = "The name of the class to use for client side connection load-balancing")
String connectionLoadBalancingPolicyClassName,
- @Parameter(name = "pingPeriod", desc = "The ping period in m")
+ @Parameter(name = "pingPeriod", desc = "The ping period in ms")
long pingPeriod,
- @Parameter(name = "callTimeout", desc = "The call timeout in m")
+ @Parameter(name = "connectionTTL", desc = "The connection TTL in ms")
+ long connectionTTL,
+ @Parameter(name = "callTimeout", desc = "The call timeout in ms")
long callTimeout,
@Parameter(name = "clientID", desc = "ClientID for created connections")
String clientID,
@@ -106,15 +108,15 @@
@Parameter(name = "maxConnections", desc = "The maximum number of physical connections created per client using this connection factory. Sessions created will be assigned a connection in a round-robin fashion")
int maxConnections,
@Parameter(name = "preAcknowledge", desc = "If the server will acknowledge delivery of a message before it is delivered")
- boolean preAcknowledge,
- @Parameter(name = "retryOnFailure", desc = "Will the server attempt to retry connecting to same server in event of failure?")
- boolean retryOnFailure,
+ boolean preAcknowledge,
@Parameter(name = "retryInterval", desc = "The retry interval in ms when retrying connecting to same server")
long retryInterval,
@Parameter(name = "retryIntervalMultiplier", desc = "The retry interval multiplier when retrying connecting to same server")
double retryIntervalMultiplier,
- @Parameter(name = "maxRetries", desc = "The maximum number of retries when retrying connecting to same server. -1 means no maximum")
- int maxRetries,
+ @Parameter(name = "maxRetriesBeforeFailover", desc = "The maximum number of connection attempts to a server before failover. -1 means no maximum")
+ int maxRetriesBeforeFailover,
+ @Parameter(name = "maxRetriesAfterFailover", desc = "The maximum number of connection attempts to a server after failover. -1 means no maximum")
+ int maxRetriesAfterFailover,
@Parameter(name = "jndiBinding", desc = "JNDI Binding")
String jndiBinding) throws Exception;
@@ -129,6 +131,8 @@
String connectionLoadBalancingPolicyClassName,
@Parameter(name = "pingPeriod", desc = "The ping period in m")
long pingPeriod,
+ @Parameter(name = "connectionTTL", desc = "The connection TTL in ms")
+ long connectionTTL,
@Parameter(name = "callTimeout", desc = "The call timeout in m")
long callTimeout,
@Parameter(name = "clientID", desc = "ClientID for created connections")
@@ -158,15 +162,15 @@
@Parameter(name = "maxConnections", desc = "The maximum number of physical connections created per client using this connection factory. Sessions created will be assigned a connection in a round-robin fashion")
int maxConnections,
@Parameter(name = "preAcknowledge", desc = "If the server will acknowledge delivery of a message before it is delivered")
- boolean preAcknowledge,
- @Parameter(name = "retryOnFailure", desc = "Will the server attempt to retry connecting to same server in event of failure?")
- boolean retryOnFailure,
+ boolean preAcknowledge,
@Parameter(name = "retryInterval", desc = "The retry interval in ms when retrying connecting to same server")
long retryInterval,
@Parameter(name = "retryIntervalMultiplier", desc = "The retry interval multiplier when retrying connecting to same server")
- double retryIntervalMultiplier,
- @Parameter(name = "maxRetries", desc = "The maximum number of retries when retrying connecting to same server. -1 means no maximum")
- int maxRetries,
+ double retryIntervalMultiplier,
+ @Parameter(name = "maxRetriesBeforeFailover", desc = "The maximum number of connection attempts to a server before failover. -1 means no maximum")
+ int maxRetriesBeforeFailover,
+ @Parameter(name = "maxRetriesAfterFailover", desc = "The maximum number of connection attempts to a server after failover. -1 means no maximum")
+ int maxRetriesAfterFailover,
@Parameter(name = "jndiBinding", desc = "JNDI Binding")
String jndiBinding) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -81,7 +81,8 @@
public void createConnectionFactory(String name,
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
String connectionLoadBalancingPolicyClassName,
- long pingPeriod,
+ long pingPeriod,
+ long connectionTTL,
long callTimeout,
String clientID,
int dupsOKBatchSize,
@@ -96,11 +97,11 @@
boolean blockOnPersistentSend,
boolean autoGroup,
int maxConnections,
- boolean preAcknowledge,
- boolean retryOnFailure,
+ boolean preAcknowledge,
long retryInterval,
- double retryIntervalMultiplier,
- int maxRetries,
+ double retryIntervalMultiplier,
+ int maxRetriesBeforeFailover,
+ int maxRetriesAfterFailover,
String jndiBinding) throws Exception
{
List<String> bindings = new ArrayList<String>();
@@ -109,7 +110,8 @@
boolean created = server.createConnectionFactory(name,
connectorConfigs,
connectionLoadBalancingPolicyClassName,
- pingPeriod,
+ pingPeriod,
+ connectionTTL,
callTimeout,
clientID,
dupsOKBatchSize,
@@ -124,11 +126,11 @@
blockOnPersistentSend,
autoGroup,
maxConnections,
- preAcknowledge,
- retryOnFailure,
+ preAcknowledge,
retryInterval,
- retryIntervalMultiplier,
- maxRetries,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover,
bindings);
if (created)
{
@@ -140,7 +142,8 @@
DiscoveryGroupConfiguration discoveryGroupConfig,
long discoveryInitialWait,
String connectionLoadBalancingPolicyClassName,
- long pingPeriod,
+ long pingPeriod,
+ long connectionTTL,
long callTimeout,
String clientID,
int dupsOKBatchSize,
@@ -155,11 +158,11 @@
boolean blockOnPersistentSend,
boolean autoGroup,
int maxConnections,
- boolean preAcknowledge,
- final boolean retryOnFailure,
+ boolean preAcknowledge,
final long retryInterval,
- final double retryIntervalMultiplier,
- final int maxRetries,
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover,
String jndiBinding) throws Exception
{
List<String> bindings = new ArrayList<String>();
@@ -169,7 +172,8 @@
discoveryGroupConfig,
discoveryInitialWait,
connectionLoadBalancingPolicyClassName,
- pingPeriod,
+ pingPeriod,
+ connectionTTL,
callTimeout,
clientID,
dupsOKBatchSize,
@@ -184,11 +188,11 @@
blockOnPersistentSend,
autoGroup,
maxConnections,
- preAcknowledge,
- retryOnFailure,
+ preAcknowledge,
retryInterval,
- retryIntervalMultiplier,
- maxRetries,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover,
bindings);
if (created)
{
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -21,10 +21,11 @@
*/
package org.jboss.test.messaging.jms;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
import java.util.ArrayList;
import java.util.List;
@@ -104,7 +105,8 @@
getJmsServerManager().createConnectionFactory("StrictTCKConnectionFactory",
connectorConfigs,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
null,
ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
@@ -119,11 +121,11 @@
true,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
- ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_RETRY_ON_FAILURE,
+ ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
jndiBindings);
cf = (JBossConnectionFactory)getInitialContext().lookup("/StrictTCKConnectionFactory");
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -1,9 +1,10 @@
package org.jboss.test.messaging.jms;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
import java.util.ArrayList;
import java.util.List;
@@ -58,7 +59,8 @@
getJmsServerManager().createConnectionFactory("testsuitecf",
connectorConfigs,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
null,
ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
@@ -73,11 +75,11 @@
true,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
- ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_RETRY_ON_FAILURE,
+ ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
jndiBindings);
cf = (JBossConnectionFactory)getInitialContext().lookup("/testsuitecf");
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -21,10 +21,11 @@
*/
package org.jboss.test.messaging.tools.container;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
import java.io.File;
import java.lang.management.ManagementFactory;
@@ -55,13 +56,13 @@
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.integration.bootstrap.JBMBootstrapServer;
import org.jboss.messaging.jms.JBossDestination;
import org.jboss.messaging.jms.server.JMSServerManager;
import org.jboss.messaging.jms.server.management.JMSQueueControlMBean;
import org.jboss.messaging.jms.server.management.SubscriptionInfo;
import org.jboss.messaging.jms.server.management.TopicControlMBean;
import org.jboss.messaging.jms.server.management.impl.JMSManagementServiceImpl;
-import org.jboss.messaging.integration.bootstrap.JBMBootstrapServer;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.SimpleString;
import org.jboss.test.messaging.tools.ConfigurationHelper;
@@ -551,7 +552,8 @@
getJMSServerManager().createConnectionFactory(objectName,
connectorConfigs,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
clientId,
dupsOkBatchSize,
@@ -566,11 +568,11 @@
true,
false,
8,
- false,
- DEFAULT_RETRY_ON_FAILURE,
+ false,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
jndiBindings);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -32,13 +32,13 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
import org.jboss.messaging.core.client.ClientConsumer;
@@ -163,6 +163,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
2000,
+ 3000,
DEFAULT_CALL_TIMEOUT,
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
@@ -175,11 +176,11 @@
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_ACK_BATCH_SIZE,
DEFAULT_RETRY_INTERVAL,
- DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReconnectTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReconnectTest.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReconnectTest.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -81,59 +81,72 @@
final double retryMultiplier = 1d;
- final int maxRetries = -1;
+ final int maxRetriesBeforeFailover = -1;
+
+ final int maxRetriesAfterFailover = 0;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
retryInterval,
retryMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
ClientSession session = sf.createSession(false, true, true);
session.createQueue(ADDRESS, ADDRESS, null, false, false, true);
+
+ final int numIterations = 100;
+
+ for (int j = 0; j < numIterations; j++)
+ {
+ log.info("Iteration " + j);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
- ClientProducer producer = session.createProducer(ADDRESS);
-
- final int numMessages = 1000;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().putString("aardvarks");
- message.getBody().flip();
- producer.send(message);
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(500);
+
+ assertNotNull(message);
+
+ assertEquals("aardvarks", message.getBody().getString());
+
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ message.acknowledge();
+ }
+
+ ClientMessage message = consumer.receiveImmediate();
+
+ assertNull(message);
+
+ producer.close();
+
+ consumer.close();
}
- log.info("Sent messages");
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
-
- conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
-
- session.start();
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(500);
-
- assertNotNull(message);
-
- assertEquals("aardvarks", message.getBody().getString());
-
- assertEquals(i, message.getProperty(new SimpleString("count")));
-
- message.acknowledge();
- }
-
- ClientMessage message = consumer.receiveImmediate();
-
- assertNull(message);
session.close();
@@ -150,12 +163,15 @@
final double retryMultiplier = 1d;
- final int maxRetries = -1;
+ final int maxRetriesBeforeFailover = -1;
+
+ final int maxRetriesAfterFailover = 0;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
retryInterval,
retryMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
ClientSession session = sf.createSession(false, true, true);
@@ -237,12 +253,15 @@
final double retryMultiplier = 1d;
- final int maxRetries = 3;
+ final int maxRetriesBeforeFailover = 3;
+
+ final int maxRetriesAfterFailover = 0;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
retryInterval,
retryMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
ClientSession session = sf.createSession(false, true, true);
@@ -280,7 +299,7 @@
{
try
{
- Thread.sleep(retryInterval * (maxRetries + 1));
+ Thread.sleep(retryInterval * (maxRetriesBeforeFailover + 1));
}
catch (InterruptedException ignore)
{
@@ -314,12 +333,15 @@
final double retryMultiplier = 1d;
- final int maxRetries = 3;
+ final int maxRetriesBeforeFailover = 3;
+
+ final int maxRetriesAfterFailover = 0;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
retryInterval,
retryMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
ClientSession session = sf.createSession(false, true, true);
@@ -357,7 +379,7 @@
{
try
{
- Thread.sleep(retryInterval * (maxRetries - 1));
+ Thread.sleep(retryInterval * (maxRetriesBeforeFailover - 1));
}
catch (InterruptedException ignore)
{
@@ -403,12 +425,15 @@
final double retryMultiplier = 1d;
- final int maxRetries = -1;
+ final int maxRetriesBeforeFailover = -1;
+
+ final int maxRetriesAfterFailover = 0;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
retryInterval,
retryMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
ClientSession session = sf.createSession(false, true, true);
@@ -495,12 +520,15 @@
final double retryMultiplier = 4d;
- final int maxRetries = -1;
+ final int maxRetriesBeforeFailover = -1;
+
+ final int maxRetriesAfterFailover = 0;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
retryInterval,
retryMultiplier,
- maxRetries);
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
ClientSession session = sf.createSession(false, true, true);
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReconnectWithBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReconnectWithBackupTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReconnectWithBackupTest.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -0,0 +1,325 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A ReconnectWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 4 Nov 2008 16:54:50
+ *
+ *
+ */
+public class ReconnectWithBackupTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(ReconnectWithBackupTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ private MessagingService liveService;
+
+ private MessagingService backupService;
+
+ private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /*
+ * Test retrying reconnect on current node before failover
+ */
+ public void testRetryBeforeFailoverSuccess() throws Exception
+ {
+ final long retryInterval = 500;
+
+ final double retryMultiplier = 1d;
+
+ final int maxRetriesBeforeFailover = 3;
+
+ final int maxRetriesAfterFailover = 0;
+
+ ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams),
+ retryInterval,
+ retryMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false, false, true);
+
+ final int numIterations = 100;
+
+ // We reconnect in a loop a few times
+ for (int j = 0; j < numIterations; j++)
+ {
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(500);
+
+ assertNotNull(message);
+
+ assertEquals("aardvarks", message.getBody().getString());
+
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ message.acknowledge();
+ }
+
+ ClientMessage message = consumer.receiveImmediate();
+
+ assertNull(message);
+
+ producer.close();
+
+ consumer.close();
+ }
+
+ session.close();
+
+ sf.close();
+ }
+
+ public void testFailoverThenFailAgainThenSuccessAfterRetry() throws Exception
+ {
+ final long retryInterval = 500;
+
+ final double retryMultiplier = 1d;
+
+ final int maxRetriesBeforeFailover = 0;
+
+ final int maxRetriesAfterFailover = 3;
+
+ ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams),
+ retryInterval,
+ retryMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false, false, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(500);
+
+ assertNotNull(message);
+
+ assertEquals("aardvarks", message.getBody().getString());
+
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ message.acknowledge();
+ }
+
+ ClientMessage message = consumer.receiveImmediate();
+
+ assertNull(message);
+
+ session.stop();
+
+ final int numIterations = 10;
+
+ for (int j = 0; j < numIterations; j++)
+ {
+
+ // Send some more messages
+
+ for (int i = numMessages; i < numMessages * 2; i++)
+ {
+ message = session.createClientMessage(JBossTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ // Now fail again - should reconnect to the backup node
+
+ conn = ((ClientSessionImpl)session).getConnection();
+
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ session.start();
+
+ for (int i = numMessages; i < numMessages * 2; i++)
+ {
+ message = consumer.receive(500);
+
+ assertNotNull(message);
+
+ assertEquals("aardvarks", message.getBody().getString());
+
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ message.acknowledge();
+ }
+
+ message = consumer.receiveImmediate();
+
+ assertNull(message);
+ }
+
+ session.close();
+
+ sf.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ Configuration backupConf = new ConfigurationImpl();
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ backupParams));
+ backupConf.setBackup(true);
+ backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration backupTC = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams,
+ "backup-connector");
+ connectors.put(backupTC.getName(), backupTC);
+ liveConf.setConnectorConfigurations(connectors);
+ liveConf.setBackupConnectorName(backupTC.getName());
+ liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+ liveService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+ backupService.stop();
+
+ assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+ liveService.stop();
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -29,16 +29,17 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
import java.util.HashMap;
@@ -101,6 +102,7 @@
backupParams),
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
pingPeriod,
+ (long)(pingPeriod * 1.5),
DEFAULT_CALL_TIMEOUT,
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
@@ -113,11 +115,11 @@
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_ACK_BATCH_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
sf1.setSendWindowSize(32 * 1024);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -680,9 +680,21 @@
public void testFailureAfterFailover() throws Exception
{
+ final long retryInterval = 500;
+
+ final double retryMultiplier = 1d;
+
+ final int maxRetriesBeforeFailover = 0;
+
+ final int maxRetriesAfterFailover = 0;
+
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams));
+ backupParams),
+ retryInterval,
+ retryMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
sf.setSendWindowSize(32 * 1024);
@@ -759,7 +771,7 @@
}
catch (MessagingException me)
{
- assertEquals(MessagingException.OBJECT_CLOSED, me.getCode());
+ assertEquals(MessagingException.NOT_CONNECTED, me.getCode());
}
session.close();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -28,17 +28,18 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
import java.util.HashMap;
@@ -56,7 +57,6 @@
import junit.framework.TestCase;
import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
@@ -113,6 +113,7 @@
backupParams),
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
null,
DEFAULT_ACK_BATCH_SIZE,
@@ -127,11 +128,11 @@
true,
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_PRE_ACKNOWLEDGE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
Connection conn = jbcf.createConnection();
@@ -198,6 +199,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
null,
DEFAULT_ACK_BATCH_SIZE,
@@ -212,17 +214,18 @@
true,
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_PRE_ACKNOWLEDGE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
JBossConnectionFactory jbcfBackup = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams),
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
null,
DEFAULT_ACK_BATCH_SIZE,
@@ -237,11 +240,11 @@
true,
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_PRE_ACKNOWLEDGE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
Connection connLive = jbcfLive.createConnection();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -27,37 +27,39 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
import junit.framework.TestCase;
+
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.JBossQueue;
import org.jboss.messaging.jms.client.JBossConnectionFactory;
import org.jboss.messaging.jms.client.JBossSession;
import org.jboss.messaging.jms.server.impl.JMSServerManagerImpl;
-import org.jboss.messaging.jms.JBossQueue;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.tests.integration.jms.management.NullInitialContext;
import org.jboss.messaging.util.SimpleString;
-import javax.jms.Connection;
-import javax.jms.Session;
-import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
-import javax.jms.Message;
-
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
@@ -91,6 +93,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
null,
DEFAULT_ACK_BATCH_SIZE,
@@ -105,11 +108,11 @@
true,
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
- true,
- DEFAULT_RETRY_ON_FAILURE,
+ true,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
}
@Override
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -28,17 +28,18 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
@@ -56,7 +57,6 @@
import junit.framework.TestCase;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -178,6 +178,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
null,
DEFAULT_ACK_BATCH_SIZE,
@@ -192,11 +193,11 @@
true,
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_PRE_ACKNOWLEDGE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
Connection conn = cf.createConnection();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -28,17 +28,18 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
@@ -51,7 +52,6 @@
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.jms.client.JBossConnectionFactory;
@@ -79,6 +79,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
null,
DEFAULT_ACK_BATCH_SIZE,
@@ -93,11 +94,11 @@
true,
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_PRE_ACKNOWLEDGE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
return cf.createConnection();
}
@@ -108,6 +109,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
null,
DEFAULT_ACK_BATCH_SIZE,
@@ -122,11 +124,11 @@
true,
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_PRE_ACKNOWLEDGE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
Connection conn = cf.createConnection();
@@ -153,6 +155,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
null,
DEFAULT_ACK_BATCH_SIZE,
@@ -167,11 +170,11 @@
true,
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_PRE_ACKNOWLEDGE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
Connection conn = cf.createConnection();
@@ -187,6 +190,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
DEFAULT_PING_PERIOD,
+ DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
null,
DEFAULT_ACK_BATCH_SIZE,
@@ -201,11 +205,11 @@
true,
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_PRE_ACKNOWLEDGE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
Connection conn = cf.createConnection();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2008-12-03 14:34:42 UTC (rev 5456)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2008-12-03 17:13:44 UTC (rev 5457)
@@ -29,29 +29,26 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_ON_FAILURE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.ConnectionManager;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
import org.jboss.messaging.core.client.impl.ClientSessionInternal;
-import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
@@ -62,9 +59,7 @@
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
import org.jboss.messaging.tests.util.ServiceTestBase;
/**
@@ -134,6 +129,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
PING_INTERVAL,
+ DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
@@ -146,11 +142,11 @@
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_ACK_BATCH_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
ClientSession session = csf.createSession(false, true, true);
@@ -208,6 +204,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
PING_INTERVAL,
+ DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
@@ -220,11 +217,11 @@
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_ACK_BATCH_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
ClientSession session = csf.createSession(false, true, true);
@@ -282,6 +279,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
PING_INTERVAL,
+ (long)(PING_INTERVAL * 1.5),
DEFAULT_CALL_TIMEOUT,
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
@@ -294,11 +292,11 @@
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_ACK_BATCH_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
Listener clientListener = new Listener();
@@ -377,6 +375,7 @@
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
PING_INTERVAL,
+ (long)(PING_INTERVAL * 1.5),
DEFAULT_CALL_TIMEOUT,
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
@@ -389,11 +388,11 @@
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_RETRY_ON_FAILURE,
+ DEFAULT_ACK_BATCH_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- DEFAULT_MAX_RETRIES);
+ 0,
+ 0);
ClientSession session = csf.createSession(false, true, true);
More information about the jboss-cvs-commits
mailing list