[jboss-cvs] JBoss Messaging SVN: r5469 - in trunk: src/main/org/jboss/messaging/core/config and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Dec 7 09:27:40 EST 2008
Author: timfox
Date: 2008-12-07 09:27:39 -0500 (Sun, 07 Dec 2008)
New Revision: 5469
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureListenerOnFailoverTest.java
Log:
More reconnect stuff
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -590,8 +590,6 @@
connection.destroy();
}
- log.info("done is " + done);
-
return done;
}
else
@@ -682,7 +680,7 @@
}
private RemotingConnection getConnectionWithRetry(final List<ClientSessionInternal> sessions, final int retries)
- {
+ {
long interval = retryInterval;
int count = 0;
@@ -696,7 +694,9 @@
// Failed to get backup connection
if (retries != 0)
- {
+ {
+ count++;
+
if (retries != -1 && count == retries)
{
log.warn("Retried " + retries + " times to reconnect. Now giving up.");
@@ -704,8 +704,6 @@
return null;
}
- count++;
-
log.warn("Now waiting " + interval + " ms before attempting reconnection.");
try
Modified: trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -74,8 +74,6 @@
return params;
}
- private int hash = -1;
-
public int hashCode()
{
return factoryClassName.hashCode();
Modified: trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -58,6 +58,14 @@
private final String transformerClassName;
+ private final long retryInterval;
+
+ private final double retryIntervalMultiplier;
+
+ private final int maxRetriesBeforeFailover;
+
+ private final int maxRetriesAfterFailover;
+
public MessageFlowConfiguration(final String name,
final String address,
final String filterString,
@@ -65,6 +73,10 @@
final int maxBatchSize,
final long maxBatchTime,
final String transformerClassName,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover,
final List<Pair<String, String>> staticConnectorNamePairs)
{
this.name = name;
@@ -74,6 +86,10 @@
this.maxBatchSize = maxBatchSize;
this.maxBatchTime = maxBatchTime;
this.transformerClassName = transformerClassName;
+ this.retryInterval = retryInterval;
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
this.staticConnectorNamePairs = staticConnectorNamePairs;
this.discoveryGroupName = null;
}
@@ -85,6 +101,10 @@
final int maxBatchSize,
final long maxBatchTime,
final String transformerClassName,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover,
final String discoveryGroupName)
{
this.name = name;
@@ -94,6 +114,10 @@
this.maxBatchSize = maxBatchSize;
this.maxBatchTime = maxBatchTime;
this.transformerClassName = transformerClassName;
+ this.retryInterval = retryInterval;
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
this.staticConnectorNamePairs = null;
this.discoveryGroupName = discoveryGroupName;
}
@@ -142,4 +166,29 @@
{
return this.discoveryGroupName;
}
+
+ public List<Pair<String, String>> getStaticConnectorNamePairs()
+ {
+ return staticConnectorNamePairs;
+ }
+
+ public long getRetryInterval()
+ {
+ return retryInterval;
+ }
+
+ public double getRetryIntervalMultiplier()
+ {
+ return retryIntervalMultiplier;
+ }
+
+ public int getMaxRetriesBeforeFailover()
+ {
+ return maxRetriesBeforeFailover;
+ }
+
+ public int getMaxRetriesAfterFailover()
+ {
+ return maxRetriesAfterFailover;
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
package org.jboss.messaging.core.config.impl;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URL;
@@ -30,6 +35,7 @@
import java.util.List;
import java.util.Map;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
@@ -514,6 +520,14 @@
String discoveryGroupName = null;
String transformerClassName = null;
+
+ long retryInterval = DEFAULT_RETRY_INTERVAL;
+
+ double retryIntervalMultiplier = DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+ int maxRetriesBeforeFailover = DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+
+ int maxRetriesAfterFailover = DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
NodeList children = bgNode.getChildNodes();
@@ -549,6 +563,22 @@
{
transformerClassName = child.getTextContent().trim();
}
+ else if (child.getNodeName().equals("retry-interval"))
+ {
+ retryInterval = XMLUtil.parseLong(child);
+ }
+ else if (child.getNodeName().equals("retry-interval-multiplier"))
+ {
+ retryIntervalMultiplier = XMLUtil.parseDouble(child);
+ }
+ else if (child.getNodeName().equals("max-retries-before-failover"))
+ {
+ maxRetriesBeforeFailover = XMLUtil.parseInt(child);
+ }
+ else if (child.getNodeName().equals("max-retries-after-failover"))
+ {
+ maxRetriesAfterFailover = XMLUtil.parseInt(child);
+ }
else if (child.getNodeName().equals("connector"))
{
String connectorName = child.getAttributes().getNamedItem("connector-name").getNodeValue();
@@ -577,6 +607,10 @@
maxBatchSize,
maxBatchTime,
transformerClassName,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover,
staticConnectorNames);
}
else
@@ -588,6 +622,10 @@
maxBatchSize,
maxBatchTime,
transformerClassName,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover,
discoveryGroupName);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -36,8 +36,6 @@
void close();
- void fail();
-
Channel getReplicatingChannel();
void transferConnection(RemotingConnection newConnection);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -444,12 +444,7 @@
// Then call the listeners
callListeners(me);
- internalClose();
-
- for (Channel channel : channels.values())
- {
- channel.fail();
- }
+ internalClose();
}
public void destroy()
@@ -1045,8 +1040,6 @@
public Packet sendBlocking(final Packet packet) throws MessagingException
{
- // System.identityHashCode(this.connection) + " " + packet.getType());
-
if (closed)
{
throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection is destroyed");
@@ -1203,7 +1196,7 @@
}
public void replicateComplete()
- {
+ {
if (!connection.active)
{
// We're on backup so send back a replication response
@@ -1317,10 +1310,6 @@
closed = true;
}
- public void fail()
- {
- }
-
public Channel getReplicatingChannel()
{
return replicatingChannel;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -103,7 +103,7 @@
connectionTTL = config.getConnectionTTLOverride();
- backup = config.isBackup();
+ backup = config.isBackup();
}
// RemotingService implementation -------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -26,6 +26,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.jboss.messaging.core.security.JBMSecurityManager;
import org.jboss.messaging.core.security.Role;
+import org.jboss.messaging.core.server.cluster.ClusterManager;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
@@ -110,4 +111,6 @@
ResourceManager getResourceManager();
List<ServerSession> getSessions(String connectionID);
+
+ ClusterManager getClusterManager();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -20,12 +20,10 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.core.server.cluster;
-import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import java.util.Map;
+
import org.jboss.messaging.core.server.MessagingComponent;
/**
@@ -35,8 +33,8 @@
*
* Created 18 Nov 2008 09:23:26
*
- *
*/
public interface ClusterManager extends MessagingComponent
{
+ Map<String, MessageFlow> getMessageFlows();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -23,6 +23,8 @@
package org.jboss.messaging.core.server.cluster;
+import java.util.Set;
+
import org.jboss.messaging.core.server.MessagingComponent;
/**
@@ -35,7 +37,6 @@
*
*/
public interface MessageFlow extends MessagingComponent
-{
-
-
+{
+ Set<Forwarder> getForwarders();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -84,7 +84,7 @@
private final Configuration configuration;
private volatile boolean started;
-
+
public ClusterManagerImpl(final ExecutorFactory executorFactory,
final StorageManager storageManager,
final PostOffice postOffice,
@@ -165,6 +165,11 @@
{
return started;
}
+
+ public Map<String, MessageFlow> getMessageFlows()
+ {
+ return new HashMap<String, MessageFlow>(messageFlows);
+ }
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
{
@@ -359,6 +364,10 @@
queueSettingsRepository,
scheduledExecutor,
transformer,
+ config.getRetryInterval(),
+ config.getRetryIntervalMultiplier(),
+ config.getMaxRetriesBeforeFailover(),
+ config.getMaxRetriesAfterFailover(),
conns);
}
else
@@ -388,6 +397,10 @@
queueSettingsRepository,
scheduledExecutor,
transformer,
+ config.getRetryInterval(),
+ config.getRetryIntervalMultiplier(),
+ config.getMaxRetriesBeforeFailover(),
+ config.getMaxRetriesAfterFailover(),
group);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -32,10 +32,14 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
@@ -58,7 +62,7 @@
*
*
*/
-public class ForwarderImpl implements Forwarder
+public class ForwarderImpl implements Forwarder, FailureListener
{
// Constants -----------------------------------------------------
@@ -81,25 +85,25 @@
private java.util.Queue<MessageReference> refs = new LinkedList<MessageReference>();
private Transaction tx;
-
+
private long lastReceivedTime = -1;
-
+
private final StorageManager storageManager;
private final PostOffice postOffice;
private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
+
private final Transformer transformer;
private final ClientSessionFactory csf;
-
+
private ClientSession session;
private ClientProducer producer;
-
+
private volatile boolean started;
-
+
private final ScheduledFuture<?> future;
// Static --------------------------------------------------------
@@ -109,15 +113,19 @@
// Public --------------------------------------------------------
public ForwarderImpl(final Queue queue,
- final Pair<TransportConfiguration,TransportConfiguration> connectorPair,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final Executor executor,
final int maxBatchSize,
final long maxBatchTime,
final StorageManager storageManager,
final PostOffice postOffice,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final ScheduledExecutorService scheduledExecutor,
- final Transformer transformer) throws Exception
+ final Transformer transformer,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover)
{
this.queue = queue;
@@ -132,36 +140,42 @@
this.postOffice = postOffice;
this.queueSettingsRepository = queueSettingsRepository;
-
+
this.transformer = transformer;
-
- this.csf = new ClientSessionFactoryImpl(connectorPair.a, connectorPair.b);
-
+
+ this.csf = new ClientSessionFactoryImpl(connectorPair.a,
+ connectorPair.b,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
+
if (maxBatchTime != -1)
{
- future = scheduledExecutor.scheduleAtFixedRate(new BatchTimeout(), maxBatchTime, maxBatchTime, TimeUnit.MILLISECONDS);
+ future = scheduledExecutor.scheduleAtFixedRate(new BatchTimeout(),
+ maxBatchTime,
+ maxBatchTime,
+ TimeUnit.MILLISECONDS);
}
else
{
future = null;
}
}
-
+
public synchronized void start() throws Exception
{
if (started)
{
return;
}
-
+
createTx();
-
- session = csf.createSession(false, false, false);
- producer = session.createProducer(null);
+ createObjects();
- queue.addConsumer(this);
-
+ queue.addConsumer(this);
+
started = true;
}
@@ -170,7 +184,7 @@
started = false;
queue.removeConsumer(this);
-
+
if (future != null)
{
future.cancel(false);
@@ -188,19 +202,25 @@
{
log.warn("Timed out waiting for batch to be sent");
}
-
+
session.close();
-
+
started = false;
}
-
+
public boolean isStarted()
{
return started;
}
+
+ //For testing only
+ public RemotingConnection getForwardingConnection()
+ {
+ return ((ClientSessionImpl)session).getConnection();
+ }
// Consumer implementation ---------------------------------------
-
+
public HandleStatus handle(final MessageReference reference) throws Exception
{
if (busy)
@@ -216,7 +236,7 @@
}
refs.add(reference);
-
+
if (maxBatchTime != -1)
{
lastReceivedTime = System.currentTimeMillis();
@@ -235,75 +255,114 @@
}
}
+ // FailureListener implementation --------------------------------
+
+ public synchronized boolean connectionFailed(final MessagingException me)
+ {
+ //By the time this is called
+ synchronized (this)
+ {
+ try
+ {
+ session.close();
+
+ createObjects();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to reconnect", e);
+ }
+
+ return true;
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ private void createObjects() throws Exception
+ {
+ try
+ {
+ session = csf.createSession(false, false, false);
+ }
+ catch (MessagingException me)
+ {
+ log.warn("Unable to connect. Message flow is now disabled.");
+
+ stop();
+
+ return;
+ }
+
+ session.addFailureListener(this);
+
+ producer = session.createProducer(null);
+ }
+
private synchronized void timeoutBatch()
{
if (!started)
{
return;
}
-
+
if (lastReceivedTime != -1 && count > 0)
{
long now = System.currentTimeMillis();
-
+
if (now - lastReceivedTime >= maxBatchTime)
{
sendBatch();
}
- }
+ }
}
-
- private void sendBatch()
+
+ private synchronized void sendBatch()
{
try
{
- synchronized (this)
+ if (count == 0)
{
- if (count == 0)
+ return;
+ }
+
+ // TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
+
+ while (true)
+ {
+ MessageReference ref = refs.poll();
+
+ if (ref == null)
{
- return;
+ break;
}
-
- // TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
- while (true)
- {
- MessageReference ref = refs.poll();
+ tx.addAcknowledgement(ref);
- if (ref == null)
- {
- break;
- }
+ ServerMessage message = ref.getMessage();
- tx.addAcknowledgement(ref);
-
- ServerMessage message = ref.getMessage();
-
- if (transformer != null)
- {
- message = transformer.transform(message);
- }
-
- producer.send(message.getDestination(), message);
+ if (transformer != null)
+ {
+ message = transformer.transform(message);
}
- session.commit();
+ producer.send(message.getDestination(), message);
+ }
- tx.commit();
+ session.commit();
- createTx();
+ tx.commit();
- busy = false;
+ createTx();
- count = 0;
- }
+ busy = false;
+ count = 0;
+
queue.deliverAsync(executor);
}
catch (Exception e)
@@ -335,7 +394,7 @@
sendBatch();
}
}
-
+
private class BatchTimeout implements Runnable
{
public void run()
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -92,6 +92,14 @@
private volatile boolean started;
+ private final long retryInterval;
+
+ private final double retryIntervalMultiplier;
+
+ private final int maxRetriesBeforeFailover;
+
+ private final int maxRetriesAfterFailover;
+
/*
* Constructor using static list of connectors
*/
@@ -107,7 +115,11 @@
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final ScheduledExecutorService scheduledExecutor,
final Transformer transformer,
- final List<Pair<TransportConfiguration,TransportConfiguration>> connectors) throws Exception
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover,
+ final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
{
this.name = name;
@@ -135,6 +147,14 @@
this.scheduledExecutor = scheduledExecutor;
+ this.retryInterval = retryInterval;
+
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
+
this.updateConnectors(connectors);
}
@@ -153,6 +173,10 @@
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final ScheduledExecutorService scheduledExecutor,
final Transformer transformer,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final int maxRetriesBeforeFailover,
+ final int maxRetriesAfterFailover,
final DiscoveryGroup discoveryGroup) throws Exception
{
this.name = name;
@@ -180,6 +204,14 @@
this.transformer = transformer;
this.discoveryGroup = discoveryGroup;
+
+ this.retryInterval = retryInterval;
+
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
}
public synchronized void start() throws Exception
@@ -223,6 +255,12 @@
{
return started;
}
+
+ //For testing only
+ public Set<Forwarder> getForwarders()
+ {
+ return new HashSet<Forwarder>(forwarders.values());
+ }
// DiscoveryListener implementation ------------------------------------------------------------------
@@ -246,11 +284,12 @@
connectorSet.addAll(connectors);
- Iterator<Map.Entry<Pair<TransportConfiguration,TransportConfiguration>, Forwarder>> iter = forwarders.entrySet().iterator();
+ Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Forwarder>> iter = forwarders.entrySet()
+ .iterator();
while (iter.hasNext())
{
- Map.Entry<Pair<TransportConfiguration,TransportConfiguration>, Forwarder> entry = iter.next();
+ Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Forwarder> entry = iter.next();
if (!connectorSet.contains(entry.getKey()))
{
@@ -262,7 +301,7 @@
}
}
- for (Pair<TransportConfiguration,TransportConfiguration> connectorPair : connectors)
+ for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
{
if (!forwarders.containsKey(connectorPair))
{
@@ -290,7 +329,11 @@
postOffice,
queueSettingsRepository,
scheduledExecutor,
- transformer);
+ transformer,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
forwarders.put(connectorPair, forwarder);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -452,6 +452,11 @@
{
return started;
}
+
+ public ClusterManager getClusterManager()
+ {
+ return clusterManager;
+ }
private synchronized void checkActivate(final RemotingConnection connection)
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -65,7 +65,7 @@
if (packet.getType() == PacketImpl.CREATESESSION && channel1.getReplicatingChannel() != null)
{
CreateSessionMessage msg = (CreateSessionMessage)packet;
-
+
Packet replPacket = new ReplicateCreateSessionMessage(msg.getName(), msg.getSessionChannelID(),
msg.getVersion(), msg.getUsername(),
msg.getPassword(), msg.getMinLargeMessageSize(),
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
package org.jboss.messaging.tests.integration.cluster.distribution;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -98,6 +103,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames);
MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow1",
address1.toString(),
@@ -106,6 +115,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
@@ -193,6 +206,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
@@ -267,6 +284,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
package org.jboss.messaging.tests.integration.cluster.distribution;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -192,6 +197,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
discoveryGroupName);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
package org.jboss.messaging.tests.integration.cluster.distribution;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -97,6 +102,10 @@
batchSize,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
package org.jboss.messaging.tests.integration.cluster.distribution;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -98,6 +103,10 @@
batchSize,
batchTime,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -0,0 +1,332 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.cluster.Forwarder;
+import org.jboss.messaging.core.server.cluster.MessageFlow;
+import org.jboss.messaging.core.server.cluster.impl.ForwarderImpl;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A MessageFlowReconnectTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 7 Dec 2008 11:48:30
+ *
+ *
+ */
+public class MessageFlowReconnectTest extends MessageFlowTestBase
+{
+ private static final Logger log = Logger.getLogger(MessageFlowReconnectTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAutomaticReconnectBeforeFailover() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ Map<String, Object> service2Params = new HashMap<String, Object>();
+ MessagingService service2 = createMessagingService(2, service2Params);
+ service2.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params,
+ "server0tc");
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params,
+ "server1tc");
+
+ TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service2Params,
+ "server2tc");
+
+ connectors.put(server1tc.getName(), server1tc);
+
+ connectors.put(server2tc.getName(), server2tc);
+
+ service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+ connectorNames.add(new Pair<String, String>(server1tc.getName(), server2tc.getName()));
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ final long retryInterval = 50;
+ final double retryIntervalMultiplier = 1d;
+ final int retriesBeforeFailover = 3;
+ final int maxRetriesAfterFailover = -1;
+
+ final String flowName = "flow1";
+
+ MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(flowName,
+ address1.toString(),
+ null,
+ true,
+ 1,
+ -1,
+ null,
+ retryInterval,
+ retryIntervalMultiplier,
+ retriesBeforeFailover,
+ maxRetriesAfterFailover,
+ connectorNames);
+
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig1);
+
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, false);
+ session1.createQueue(address1, address1, null, false, false, false);
+ ClientProducer prod0 = session0.createProducer(address1);
+
+ ClientConsumer cons1 = session1.createConsumer(address1);
+
+ session1.start();
+
+ ClientMessage message = session0.createClientMessage(false);
+ SimpleString propKey = new SimpleString("propkey");
+ SimpleString propVal = new SimpleString("propval");
+ message.putStringProperty(propKey, propVal);
+ message.getBody().flip();
+
+ //Now we will simulate a failure of the message flow connection between server1 and server2
+ //And prevent reconnection for a few tries, then it will reconnect without failing over
+ MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
+ Forwarder forwarder = flow.getForwarders().iterator().next();
+ RemotingConnection forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+ InVMConnector.failOnCreateConnection = true;
+ InVMConnector.numberOfFailures = retriesBeforeFailover - 1;
+ forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ prod0.send(message);
+
+ ClientMessage r1 = cons1.receive(1000);
+ assertNotNull(r1);
+ assertEquals(propVal, r1.getProperty(propKey));
+
+ session0.close();
+ session1.close();
+
+ service0.stop();
+ service1.stop();
+ service2.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+ }
+
+ public void testAutomaticReconnectTryThenFailover() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+
+ Map<String, Object> service2Params = new HashMap<String, Object>();
+ MessagingService service2 = createMessagingService(2, service2Params, true);
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params,
+ "server0tc");
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params,
+ "server1tc");
+
+ TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service2Params,
+ "server2tc");
+
+ connectors.put(server1tc.getName(), server1tc);
+
+ connectors.put(server2tc.getName(), server2tc);
+
+ service1.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ service1.getServer().getConfiguration().setBackupConnectorName(server2tc.getName());
+
+ service2.getServer().getConfiguration().setBackup(true);
+
+ service1.start();
+
+ service2.start();
+
+ log.info("Started service1 and service2");
+
+ service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+ connectorNames.add(new Pair<String, String>(server1tc.getName(), server2tc.getName()));
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ final long retryInterval = 50;
+ final double retryIntervalMultiplier = 1d;
+ final int retriesBeforeFailover = 3;
+ final int maxRetriesAfterFailover = -1;
+
+ final String flowName = "flow1";
+
+ MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(flowName,
+ address1.toString(),
+ null,
+ true,
+ 1,
+ -1,
+ null,
+ retryInterval,
+ retryIntervalMultiplier,
+ retriesBeforeFailover,
+ maxRetriesAfterFailover,
+ connectorNames);
+
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig1);
+
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ log.info("started service0");
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+ ClientSession session2 = csf2.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, false);
+ session2.createQueue(address1, address1, null, false, false, false);
+ ClientProducer prod0 = session0.createProducer(address1);
+
+ ClientConsumer cons1 = session2.createConsumer(address1);
+
+ session2.start();
+
+ ClientMessage message = session0.createClientMessage(false);
+ SimpleString propKey = new SimpleString("propkey");
+ SimpleString propVal = new SimpleString("propval");
+ message.putStringProperty(propKey, propVal);
+ message.getBody().flip();
+
+ //Now we will simulate a failure of the message flow connection between server1 and server2
+ //And prevent reconnection for a few tries, then it will failover
+ MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
+ Forwarder forwarder = flow.getForwarders().iterator().next();
+ RemotingConnection forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+ InVMConnector.failOnCreateConnection = true;
+ InVMConnector.numberOfFailures = retriesBeforeFailover;
+ forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ prod0.send(message);
+
+ ClientMessage r1 = cons1.receive(2000);
+ assertNotNull(r1);
+ assertEquals(propVal, r1.getProperty(propKey));
+
+ session0.close();
+ session2.close();
+
+ service0.stop();
+ service1.stop();
+ service2.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ InVMConnector.resetFailures();
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -20,12 +20,13 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.tests.integration.cluster.distribution;
import java.util.HashMap;
import java.util.Map;
+import junit.framework.TestCase;
+
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -33,8 +34,6 @@
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import junit.framework.TestCase;
-
/**
* A MessageFlowTestBase
*
@@ -46,7 +45,7 @@
*/
public abstract class MessageFlowTestBase extends TestCase
{
- protected MessagingService createMessagingService(final int id, Map<String, Object> params)
+ protected MessagingService createMessagingService(final int id, final Map<String, Object> params)
{
Configuration serviceConf = new ConfigurationImpl();
serviceConf.setClustered(true);
@@ -59,6 +58,20 @@
return service;
}
+ protected MessagingService createMessagingService(final int id, final Map<String, Object> params, final boolean backup)
+ {
+ Configuration serviceConf = new ConfigurationImpl();
+ serviceConf.setClustered(true);
+ serviceConf.setSecurityEnabled(false);
+ serviceConf.setBackup(backup);
+ params.put(TransportConstants.SERVER_ID_PROP_NAME, id);
+ serviceConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ params));
+ MessagingService service = MessagingServiceImpl.newNullStorageMessagingServer(serviceConf);
+ return service;
+ }
+
protected MessagingService createMessagingService(final int id)
{
return this.createMessagingService(id, new HashMap<String, Object>());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
package org.jboss.messaging.tests.integration.cluster.distribution;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -95,6 +100,10 @@
1,
-1,
"org.jboss.messaging.tests.integration.cluster.distribution.SimpleTransformer",
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
package org.jboss.messaging.tests.integration.cluster.distribution;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -103,6 +108,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
package org.jboss.messaging.tests.integration.cluster.distribution;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -97,6 +102,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
package org.jboss.messaging.tests.integration.cluster.distribution;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -36,8 +41,6 @@
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
@@ -129,6 +132,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
@@ -290,6 +297,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
@@ -466,6 +477,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames1);
MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow2",
testAddress.toString(),
@@ -474,6 +489,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames2);
MessageFlowConfiguration ofconfig3 = new MessageFlowConfiguration("flow3",
testAddress.toString(),
@@ -482,6 +501,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames3);
MessageFlowConfiguration ofconfig4 = new MessageFlowConfiguration("flow4",
testAddress.toString(),
@@ -490,6 +513,10 @@
1,
-1,
null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
connectorNames4);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureListenerOnFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureListenerOnFailoverTest.java 2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureListenerOnFailoverTest.java 2008-12-07 14:27:39 UTC (rev 5469)
@@ -43,7 +43,6 @@
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
/**
*
@@ -65,8 +64,6 @@
// Attributes ----------------------------------------------------
- private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
-
private MessagingService liveService;
private MessagingService backupService;
More information about the jboss-cvs-commits
mailing list