[jboss-cvs] JBoss Messaging SVN: r5581 - in trunk: src/main/org/jboss/messaging/core/config/impl and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jan 6 10:48:59 EST 2009
Author: timfox
Date: 2009-01-06 10:48:57 -0500 (Tue, 06 Jan 2009)
New Revision: 5581
Modified:
trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/server/Bindable.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/management/MessageFlowControlTest.java
trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
Clustering max hops
Modified: trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -67,6 +67,8 @@
private final int maxRetriesAfterFailover;
private final boolean useDuplicateDetection;
+
+ private final int maxHops;
public MessageFlowConfiguration(final String name,
final String address,
@@ -80,6 +82,7 @@
final int maxRetriesBeforeFailover,
final int maxRetriesAfterFailover,
final boolean useDuplicateDetection,
+ final int maxHops,
final List<Pair<String, String>> staticConnectorNamePairs)
{
this.name = name;
@@ -94,6 +97,7 @@
this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
this.maxRetriesAfterFailover = maxRetriesAfterFailover;
this.useDuplicateDetection = useDuplicateDetection;
+ this.maxHops = maxHops;
this.staticConnectorNamePairs = staticConnectorNamePairs;
this.discoveryGroupName = null;
}
@@ -110,6 +114,7 @@
final int maxRetriesBeforeFailover,
final int maxRetriesAfterFailover,
final boolean useDuplicateDetection,
+ final int maxHops,
final String discoveryGroupName)
{
this.name = name;
@@ -124,6 +129,7 @@
this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
this.maxRetriesAfterFailover = maxRetriesAfterFailover;
this.useDuplicateDetection = useDuplicateDetection;
+ this.maxHops = maxHops;
this.staticConnectorNamePairs = null;
this.discoveryGroupName = discoveryGroupName;
}
@@ -197,4 +203,9 @@
{
return useDuplicateDetection;
}
+
+ public int getMaxHops()
+ {
+ return maxHops;
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -116,6 +116,8 @@
public static final boolean DEFAULT_PERSIST_ID_CACHE = true;
public static final boolean DEFAULT_USE_DUPLICATE_DETECTION = true;
+
+ public static final int DEFAULT_MAX_HOPS = 1;
// Attributes -----------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -535,6 +535,8 @@
int maxRetriesAfterFailover = DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
boolean useDuplicateDetection = DEFAULT_USE_DUPLICATE_DETECTION;
+
+ int maxHops = DEFAULT_MAX_HOPS;
NodeList children = bgNode.getChildNodes();
@@ -590,6 +592,10 @@
{
useDuplicateDetection = XMLUtil.parseBoolean(child);
}
+ else if (child.getNodeName().equals("max-hops"))
+ {
+ maxHops = XMLUtil.parseInt(child);
+ }
else if (child.getNodeName().equals("connector-ref"))
{
String connectorName = child.getAttributes().getNamedItem("connector-name").getNodeValue();
@@ -623,6 +629,7 @@
maxRetriesBeforeFailover,
maxRetriesAfterFailover,
useDuplicateDetection,
+ maxHops,
staticConnectorNames);
}
else
@@ -639,6 +646,7 @@
maxRetriesBeforeFailover,
maxRetriesAfterFailover,
useDuplicateDetection,
+ maxHops,
discoveryGroupName);
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -96,6 +96,7 @@
public String getFilter()
{
Filter filter = queue.getFilter();
+
return (filter != null) ? filter.getFilterString().toString() : null;
}
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -66,6 +66,8 @@
public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("JBM_SCHED_DELIVERY");
public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("JBM_DUPL_ID");
+
+ public static final SimpleString HDR_MAX_HOPS = new SimpleString("JBM_MAX_HOPS");
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -95,14 +95,7 @@
{
// We need to round robin
- Binding binding = getNext(message);
-
- if (binding != null)
- {
- Bindable bindable = binding.getBindable();
-
- bindable.route(message, tx);
- }
+ routeRoundRobin(message, tx);
}
else
{
@@ -129,14 +122,12 @@
}
}
- private Binding getNext(final ServerMessage message)
+ private void routeRoundRobin(final ServerMessage message, final Transaction tx) throws Exception
{
//It's not an exact round robin under concurrent access but that doesn't matter
int startPos = -1;
-
- Binding ret = binding;
-
+
while (true)
{
try
@@ -147,36 +138,34 @@
{
binding = bindings.get(thePos);
- ret = binding;
-
weightCount.set(binding.getWeight());
}
- Filter filter = binding.getBindable().getFilter();
-
- if ((filter != null && !filter.match(message)) || weightCount.get() == 0)
+ if (weightCount.get() != 0)
{
- if (thePos == startPos)
+ if (binding.getBindable().route(message, tx))
{
- //Tried them all
- return null;
+ if (weightCount.decrementAndGet() <= 0)
+ {
+ advance();
+ }
+
+ return;
}
-
- if (startPos == -1)
- {
- startPos = thePos;
- }
-
- advance();
-
- continue;
}
- else if (weightCount.decrementAndGet() <= 0)
+
+ if (thePos == startPos)
{
- advance();
+ //Tried them all
+ return;
}
-
- break;
+
+ if (startPos == -1)
+ {
+ startPos = thePos;
+ }
+
+ advance();
}
catch (IndexOutOfBoundsException e)
{
@@ -184,7 +173,7 @@
if (bindings.isEmpty())
{
- return null;
+ return;
}
else
{
@@ -194,7 +183,6 @@
}
}
}
- return ret;
}
private void advance()
Modified: trunk/src/main/org/jboss/messaging/core/server/Bindable.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Bindable.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/server/Bindable.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -46,7 +46,7 @@
void setPersistenceID(long id);
- void route(ServerMessage message, Transaction tx) throws Exception;
+ boolean route(ServerMessage message, Transaction tx) throws Exception;
boolean isDurable();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -383,6 +383,7 @@
config.getMaxRetriesBeforeFailover(),
config.getMaxRetriesAfterFailover(),
config.isUseDuplicateDetection(),
+ config.getMaxHops(),
conns);
}
else
@@ -417,6 +418,7 @@
config.getMaxRetriesBeforeFailover(),
config.getMaxRetriesAfterFailover(),
config.isUseDuplicateDetection(),
+ config.getMaxHops(),
group);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -108,6 +108,8 @@
private final ScheduledFuture<?> future;
+ private final int maxHops;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -127,7 +129,8 @@
final long retryInterval,
final double retryIntervalMultiplier,
final int maxRetriesBeforeFailover,
- final int maxRetriesAfterFailover)
+ final int maxRetriesAfterFailover,
+ final int maxHops)
{
this.queue = queue;
@@ -145,6 +148,8 @@
this.transformer = transformer;
+ this.maxHops = maxHops;
+
this.csf = new ClientSessionFactoryImpl(connectorPair.a,
connectorPair.b,
retryInterval,
@@ -166,26 +171,28 @@
}
public synchronized void start() throws Exception
- {
+ {
if (started)
{
return;
}
+ //log.info("starting forwarder");
+
queue.addConsumer(this);
createTx();
if (createObjects())
- {
+ {
started = true;
-
+
queue.deliverAsync(executor);
}
}
public synchronized void stop() throws Exception
- {
+ {
started = false;
queue.removeConsumer(this);
@@ -220,8 +227,8 @@
{
return started;
}
-
- //For testing only
+
+ // For testing only
public RemotingConnection getForwardingConnection()
{
return ((ClientSessionImpl)session).getConnection();
@@ -230,7 +237,7 @@
// Consumer implementation ---------------------------------------
public HandleStatus handle(final MessageReference reference) throws Exception
- {
+ {
if (busy)
{
return HandleStatus.BUSY;
@@ -242,9 +249,9 @@
{
return HandleStatus.BUSY;
}
-
+
reference.getQueue().referenceHandled();
-
+
refs.add(reference);
if (maxBatchTime != -1)
@@ -268,21 +275,21 @@
// FailureListener implementation --------------------------------
public synchronized boolean connectionFailed(final MessagingException me)
- {
- //By the time this is called
+ {
+ // By the time this is called
synchronized (this)
- {
+ {
try
{
session.close();
-
+
createObjects();
}
catch (Exception e)
{
log.error("Failed to reconnect", e);
}
-
+
return true;
}
}
@@ -302,16 +309,16 @@
catch (MessagingException me)
{
log.warn("Unable to connect. Message flow is now disabled.");
-
+
stop();
-
+
return false;
}
session.addFailureListener(this);
- producer = session.createProducer(null);
-
+ producer = session.createProducer(null);
+
return true;
}
@@ -341,7 +348,7 @@
{
return;
}
-
+
// TODO - if batch size = 1 then don't need tx
while (true)
@@ -354,29 +361,47 @@
}
ref.acknowledge(tx, storageManager, postOffice, queueSettingsRepository);
-
+
ServerMessage message = ref.getMessage();
if (transformer != null)
{
message = transformer.transform(message);
}
-
+
SimpleString forwardingDestination = (SimpleString)message.getProperty(MessageImpl.HDR_ORIGIN_QUEUE);
- producer.send(forwardingDestination, message);
- }
+ if (maxHops != -1)
+ {
+ Integer iMaxHops = (Integer)message.getProperty(MessageImpl.HDR_MAX_HOPS);
+ if (iMaxHops == null)
+ {
+ message.putIntProperty(MessageImpl.HDR_MAX_HOPS, maxHops - 1);
+
+ // log.info("In forwarder, putting maxhops " + (maxHops - 1));
+ }
+ else
+ {
+ message.putIntProperty(MessageImpl.HDR_MAX_HOPS, iMaxHops - 1);
+
+ // log.info("In forwarder, putting maxhops " + (iMaxHops - 1));
+ }
+ }
+
+ producer.send(forwardingDestination, message);
+ }
+
session.commit();
tx.commit();
-
+
createTx();
busy = false;
count = 0;
-
+
queue.deliverAsync(executor);
}
catch (Exception e)
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -105,6 +105,8 @@
private final int maxRetriesAfterFailover;
private final boolean useDuplicateDetection;
+
+ private final int maxHops;
/*
* Constructor using static list of connectors
@@ -126,6 +128,7 @@
final int maxRetriesBeforeFailover,
final int maxRetriesAfterFailover,
final boolean useDuplicateDetection,
+ final int maxHops,
final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
{
this.name = name;
@@ -163,6 +166,8 @@
this.maxRetriesAfterFailover = maxRetriesAfterFailover;
this.useDuplicateDetection = useDuplicateDetection;
+
+ this.maxHops = maxHops;
this.updateConnectors(connectors);
}
@@ -187,6 +192,7 @@
final int maxRetriesBeforeFailover,
final int maxRetriesAfterFailover,
final boolean useDuplicateDetection,
+ final int maxHops,
final DiscoveryGroup discoveryGroup) throws Exception
{
this.name = name;
@@ -224,6 +230,8 @@
this.maxRetriesAfterFailover = maxRetriesAfterFailover;
this.useDuplicateDetection = useDuplicateDetection;
+
+ this.maxHops = maxHops;
}
public synchronized void start() throws Exception
@@ -397,7 +405,8 @@
retryInterval,
retryIntervalMultiplier,
maxRetriesBeforeFailover,
- maxRetriesAfterFailover);
+ maxRetriesAfterFailover,
+ maxHops);
forwarders.put(connectorPair, forwarder);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -90,10 +90,29 @@
this.storageManager = storageManager;
}
- public void route(final ServerMessage message, final Transaction tx) throws Exception
+ public boolean route(final ServerMessage message, final Transaction tx) throws Exception
{
+ if (filter != null && !filter.match(message))
+ {
+ return false;
+ }
+
+ Integer iMaxHops = (Integer)message.getProperty(MessageImpl.HDR_MAX_HOPS);
+
+ // log.info("IN LinkIMpl::route imaxhops is " + iMaxHops);
+
+ if (iMaxHops != null)
+ {
+ int maxHops = iMaxHops.intValue();
+
+ if (maxHops <= 0)
+ {
+ return false;
+ }
+ }
+
ServerMessage copy = message.copy();
-
+
copy.setMessageID(storageManager.generateUniqueID());
SimpleString originalDestination = copy.getDestination();
@@ -118,6 +137,8 @@
}
postOffice.route(copy, tx);
+
+ return true;
}
public Filter getFilter()
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -153,8 +153,13 @@
// Bindable implementation -------------------------------------------------------------------------------------
- public void route(final ServerMessage message, Transaction tx) throws Exception
+ public boolean route(final ServerMessage message, Transaction tx) throws Exception
{
+ if (filter != null && !filter.match(message))
+ {
+ return false;
+ }
+
SimpleString duplicateID = (SimpleString)message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
DuplicateIDCache cache = null;
@@ -176,7 +181,7 @@
tx.markAsRollbackOnly(null);
}
- return;
+ return true;
}
}
@@ -326,6 +331,8 @@
{
tx.commit();
}
+
+ return true;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -26,6 +26,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_HOPS;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
import java.util.ArrayList;
@@ -43,6 +44,7 @@
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.server.MessagingService;
@@ -109,6 +111,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow1",
address1.toString(),
@@ -122,6 +125,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
@@ -214,6 +218,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
@@ -293,6 +298,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -26,6 +26,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_HOPS;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
import java.util.ArrayList;
@@ -204,6 +205,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
discoveryGroupName);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -26,6 +26,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_HOPS;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
import java.util.ArrayList;
@@ -108,6 +109,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -26,6 +26,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_HOPS;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
import java.util.ArrayList;
@@ -109,6 +110,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -22,6 +22,7 @@
package org.jboss.messaging.tests.integration.cluster.distribution;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_HOPS;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
import java.util.ArrayList;
@@ -132,6 +133,7 @@
retriesBeforeFailover,
maxRetriesAfterFailover,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
@@ -263,6 +265,7 @@
retriesBeforeFailover,
maxRetriesAfterFailover,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
@@ -398,6 +401,7 @@
retriesBeforeFailover,
maxRetriesAfterFailover,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
@@ -519,6 +523,7 @@
retriesBeforeFailover,
maxRetriesAfterFailover,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
@@ -629,6 +634,7 @@
retriesBeforeFailover,
maxRetriesAfterFailover,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -24,6 +24,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_HOPS;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
import java.util.ArrayList;
@@ -41,7 +42,6 @@
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
@@ -77,11 +77,11 @@
public void testRestartOutflow() throws Exception
{
Map<String, Object> service0Params = new HashMap<String, Object>();
- MessagingService service0 = createClusteredServiceWithParams(true, service0Params);
+ MessagingService service0 = createClusteredServiceWithParams(0, true, service0Params);
Map<String, Object> service1Params = new HashMap<String, Object>();
service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
- MessagingService service1 = createClusteredServiceWithParams(true, service1Params);
+ MessagingService service1 = createClusteredServiceWithParams(1, true, service1Params);
//We don't start server 1 at this point
@@ -111,6 +111,7 @@
0,
0,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -26,6 +26,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_HOPS;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
import java.util.ArrayList;
@@ -106,6 +107,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -26,6 +26,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_HOPS;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
import java.util.ArrayList;
@@ -114,6 +115,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -26,6 +26,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_HOPS;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
import java.util.ArrayList;
@@ -108,6 +109,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -26,6 +26,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_HOPS;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
import java.util.ArrayList;
@@ -138,6 +139,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
@@ -304,6 +306,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
@@ -485,6 +488,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames1);
MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow2",
testAddress.toString(),
@@ -498,6 +502,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames2);
MessageFlowConfiguration ofconfig3 = new MessageFlowConfiguration("flow3",
testAddress.toString(),
@@ -511,6 +516,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames3);
MessageFlowConfiguration ofconfig4 = new MessageFlowConfiguration("flow4",
testAddress.toString(),
@@ -524,6 +530,7 @@
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
DEFAULT_USE_DUPLICATE_DETECTION,
+ DEFAULT_MAX_HOPS,
connectorNames4);
Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/MessageFlowControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/MessageFlowControlTest.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/MessageFlowControlTest.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -76,6 +76,7 @@
randomPositiveInt(),
randomPositiveInt(),
randomBoolean(),
+ randomPositiveInt(),
discoveryGroupName);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -136,9 +136,9 @@
return createService(realFiles, configuration, new HashMap<String, QueueSettings>());
}
- protected MessagingService createClusteredServiceWithParams(final boolean realFiles, final Map<String, Object> params)
+ protected MessagingService createClusteredServiceWithParams(final int index, final boolean realFiles, final Map<String, Object> params)
{
- return createService(realFiles, createClusteredDefaultConfig(params, INVM_ACCEPTOR_FACTORY), new HashMap<String, QueueSettings>());
+ return createService(realFiles, createClusteredDefaultConfig(index, params, INVM_ACCEPTOR_FACTORY), new HashMap<String, QueueSettings>());
}
protected Configuration createDefaultConfig()
@@ -158,15 +158,37 @@
}
}
- protected Configuration createClusteredDefaultConfig(final Map<String, Object> params, final String... acceptors)
+ protected Configuration createClusteredDefaultConfig(final int index, final Map<String, Object> params, final String... acceptors)
{
- Configuration config = createDefaultConfig(params, acceptors);
+ Configuration config = createDefaultConfig(index, params, acceptors);
config.setClustered(true);
return config;
}
+
+ protected Configuration createDefaultConfig(int index, final Map<String, Object> params, final String... acceptors)
+ {
+ Configuration configuration = new ConfigurationImpl();
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(index));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(index));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setPagingDirectory(getPageDir(index));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(index));
+ configuration.getAcceptorConfigurations().clear();
+
+ for (String acceptor : acceptors)
+ {
+ TransportConfiguration transportConfig = new TransportConfiguration(acceptor, params);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ }
+
+ return configuration;
+ }
+
protected Configuration createDefaultConfig(final Map<String, Object> params, final String... acceptors)
{
Configuration configuration = new ConfigurationImpl();
Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-01-06 09:37:08 UTC (rev 5580)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-01-06 15:48:57 UTC (rev 5581)
@@ -207,6 +207,11 @@
{
return testDir + "/journal";
}
+
+ protected String getJournalDir(int index)
+ {
+ return getJournalDir(testDir) + index;
+ }
/**
* @return the bindingsDir
@@ -215,7 +220,7 @@
{
return getBindingsDir(testDir);
}
-
+
/**
* @return the bindingsDir
*/
@@ -225,6 +230,14 @@
}
/**
+ * @return the bindingsDir
+ */
+ protected String getBindingsDir(int index)
+ {
+ return getBindingsDir(testDir) + index;
+ }
+
+ /**
* @return the pageDir
*/
protected String getPageDir()
@@ -239,6 +252,11 @@
{
return testDir + "/page";
}
+
+ protected String getPageDir(int index)
+ {
+ return getPageDir(testDir) + index;
+ }
/**
* @return the largeMessagesDir
@@ -255,6 +273,11 @@
{
return testDir + "/large-msg";
}
+
+ protected String getLargeMessagesDir(int index)
+ {
+ return getLargeMessagesDir(testDir) + index;
+ }
/**
* @return the clientLargeMessagesDir
More information about the jboss-cvs-commits
mailing list