[jboss-cvs] JBoss Messaging SVN: r2116 - in trunk/tests/src/org/jboss/test/messaging: core/plugin/postoffice/cluster and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 31 06:29:49 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-01-31 06:29:48 -0500 (Wed, 31 Jan 2007)
New Revision: 2116
Added:
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPersistenceServiceConfigFileJChannelFactory.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/SimpleJChannelFactory.java
Removed:
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NamedJChannelFactory.java
Modified:
trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
Testing framework uses now only the JGroups stack configurations we ship With the release.
http://jira.jboss.org/jira/browse/JBMESSAGING-792
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java 2007-01-31 11:29:26 UTC (rev 2115)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java 2007-01-31 11:29:48 UTC (rev 2116)
@@ -37,13 +37,23 @@
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.contract.FailoverMapper;
import org.jboss.messaging.core.plugin.postoffice.DefaultPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.jchannelfactory.JChannelFactory;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.core.SimpleConditionFactory;
import org.jboss.test.messaging.core.SimpleFilterFactory;
import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.postoffice.cluster.ClusteredPersistenceServiceConfigFileJChannelFactory;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.jmx.ServiceContainer;
import org.jboss.test.messaging.util.CoreMessageFactory;
@@ -53,6 +63,7 @@
* A PostOfficeTestBase
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -60,12 +71,69 @@
*/
public class PostOfficeTestBase extends MessagingTestCase
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
+ protected static ClusteredPostOffice createClusteredPostOffice(int nodeID,
+ String groupName,
+ ServiceContainer sc,
+ MessageStore ms,
+ PersistenceManager pm,
+ TransactionRepository tr,
+ QueuedExecutorPool pool)
+ throws Exception
+ {
+ return createClusteredPostOffice(nodeID, groupName, 5000, 5000, new NullMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
+ }
+
+
+ protected static ClusteredPostOffice createClusteredPostOffice(int nodeID,
+ String groupName,
+ long stateTimeout,
+ long castTimeout,
+ MessagePullPolicy pullPolicy,
+ ServiceContainer sc,
+ MessageStore ms,
+ PersistenceManager pm,
+ TransactionRepository tr,
+ QueuedExecutorPool pool)
+ throws Exception
+ {
+ FilterFactory ff = new SimpleFilterFactory();
+ ClusterRouterFactory rf = new DefaultRouterFactory();
+ FailoverMapper mapper = new DefaultFailoverMapper();
+ ConditionFactory cf = new SimpleConditionFactory();
+
+ // we're testing with priority JGroups stack configurations we're shipping with the release
+
+ // TODO (ovidiu) we're currently using the mysql configuration file. We could refine this even
+ // further by actually figuring out what database we're currently running on, and use
+ // that file. Useful when we'll run database matrix tests.
+ // See http://jira.jboss.org/jira/browse/JBMESSAGING-793
+ String configFilePath = "server/default/deploy/clustered-mysql-persistence-service.xml";
+
+ // TODO (ovidiu) we're temporarily ignoring the multiplex option, it doesn't work well
+ boolean ignoreMultiplexer = true;
+ JChannelFactory jChannelFactory =
+ new ClusteredPersistenceServiceConfigFileJChannelFactory(configFilePath,
+ ignoreMultiplexer,
+ sc.getMBeanServer());
+
+ DefaultClusteredPostOffice postOffice =
+ new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+ sc.getClusteredPostOfficeSQLProperties(), true, nodeID,
+ "Clustered", ms, pm, tr, ff, cf, pool,
+ groupName, jChannelFactory,
+ stateTimeout, castTimeout, pullPolicy, rf, mapper, 1000);
+ postOffice.start();
+
+ return postOffice;
+ }
+
+ // Attributes -----------------------------------------------------------------------------------
+
protected ServiceContainer sc;
protected IDManager channelIDManager;
@@ -90,58 +158,7 @@
}
// Public --------------------------------------------------------
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- sc = new ServiceContainer("all");
-
- sc.start();
-
- pm =
- new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
- sc.getPersistenceManagerSQLProperties(),
- true, true, true, 100);
- pm.start();
-
- transactionIDManager = new IDManager("TRANSACTION_ID", 10, pm);
- transactionIDManager.start();
-
- ms = new SimpleMessageStore();
- ms.start();
-
- tr = new TransactionRepository(pm, ms, transactionIDManager);
- tr.start();
-
- pool = new QueuedExecutorPool(10);
-
- channelIDManager = new IDManager("CHANNEL_ID", 10, pm);
- channelIDManager.start();
-
- conditionFactory = new SimpleConditionFactory();
-
- log.debug("setup done");
- }
-
- public void tearDown() throws Exception
- {
- if (!ServerManagement.isRemote())
- {
- sc.stop();
- sc = null;
- }
- pm.stop();
- tr.stop();
- ms.stop();
- transactionIDManager.stop();
- channelIDManager.stop();
-
- super.tearDown();
- }
- // Public --------------------------------------------------------
-
protected PostOffice createPostOffice() throws Exception
{
FilterFactory ff = new SimpleFilterFactory();
@@ -225,7 +242,56 @@
assertNotNull(msgs);
assertTrue(msgs.isEmpty());
}
-
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ sc = new ServiceContainer("all");
+
+ sc.start();
+
+ pm =
+ new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
+ sc.getPersistenceManagerSQLProperties(),
+ true, true, true, 100);
+ pm.start();
+
+ transactionIDManager = new IDManager("TRANSACTION_ID", 10, pm);
+ transactionIDManager.start();
+
+ ms = new SimpleMessageStore();
+ ms.start();
+
+ tr = new TransactionRepository(pm, ms, transactionIDManager);
+ tr.start();
+
+ pool = new QueuedExecutorPool(10);
+
+ channelIDManager = new IDManager("CHANNEL_ID", 10, pm);
+ channelIDManager.start();
+
+ conditionFactory = new SimpleConditionFactory();
+
+ log.debug("setup done");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ sc.stop();
+ sc = null;
+ }
+ pm.stop();
+ tr.stop();
+ ms.stop();
+ transactionIDManager.stop();
+ channelIDManager.stop();
+
+ super.tearDown();
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPersistenceServiceConfigFileJChannelFactory.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPersistenceServiceConfigFileJChannelFactory.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPersistenceServiceConfigFileJChannelFactory.java 2007-01-31 11:29:48 UTC (rev 2116)
@@ -0,0 +1,203 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import org.jboss.messaging.core.plugin.postoffice.cluster.jchannelfactory.JChannelFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.jchannelfactory.MultiplexerJChannelFactory;
+import org.jboss.logging.Logger;
+import org.jboss.test.messaging.tools.jboss.ServiceDeploymentDescriptor;
+import org.jboss.test.messaging.tools.jboss.MBeanConfigurationElement;
+import org.jboss.jms.util.XMLUtil;
+import org.jgroups.JChannel;
+import org.w3c.dom.Element;
+
+import javax.management.ObjectName;
+import javax.management.MBeanServer;
+import java.net.URL;
+
+/**
+ * A JChannelFactory that reads the configuration of its synchronous/asynchronous JChannels from a
+ * Messaging-style clustered persistence service configuration file (usually shipped with a
+ * Messaging installation). The idea is to test with priority whatever we ship.
+ *
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class ClusteredPersistenceServiceConfigFileJChannelFactory implements JChannelFactory
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ public static final Logger log =
+ Logger.getLogger(ClusteredPersistenceServiceConfigFileJChannelFactory.class);
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private String configFilePath;
+
+ // we're either using a delegate MultiplexerJChannelFactory, if we find one configured in the
+ // file ...
+ private JChannelFactory multiplexorDelegate;
+
+ // ... or just plain XML configuration.
+ private Element syncConfig;
+ private Element asyncConfig;
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ /**
+ * @param configFilePath - the configuration file to read JGroups stack configurations from. Must
+ * be relative to classpath components in order to be found.
+ * @param skipMultiplex - if true, ignore multiplex option, even if a channel factory name is
+ * found in the configuration file. Otherwise, the channel factory will take priority
+ * if found.
+ * @param mbeanServer - the MBeanServer instance, needed in case a channel factory name is found
+ * in the configuration file. In this situation, the channel factory is preferred.
+ * Irrelevant if skipMultiplex is true.
+ */
+ public ClusteredPersistenceServiceConfigFileJChannelFactory(String configFilePath,
+ boolean skipMultiplex,
+ MBeanServer mbeanServer)
+ throws Exception
+ {
+ this.configFilePath = configFilePath;
+ init(configFilePath, skipMultiplex, mbeanServer);
+ }
+
+ // JChannelFactory ------------------------------------------------------------------------------
+
+ public JChannel createSyncChannel() throws Exception
+ {
+ if (multiplexorDelegate != null)
+ {
+ return multiplexorDelegate.createSyncChannel();
+ }
+ else
+ {
+ return new JChannel(syncConfig);
+ }
+ }
+
+ public JChannel createASyncChannel() throws Exception
+ {
+ if (multiplexorDelegate != null)
+ {
+ return multiplexorDelegate.createASyncChannel();
+ }
+ else
+ {
+ return new JChannel(asyncConfig);
+ }
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public String toString()
+ {
+ return "ClusteredPersistenceServiceConfigFileJChannelFactory[" + configFilePath + "]";
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ private void init(String configFilePath, boolean skipMultiplex, MBeanServer mbeanServer)
+ throws Exception
+ {
+ log.debug("using configuration file " + configFilePath);
+
+ URL configFileURL = getClass().getClassLoader().getResource(configFilePath);
+
+ if (configFileURL == null)
+ {
+ throw new Exception("Cannot find " + configFilePath + " in the classpath");
+ }
+
+ ServiceDeploymentDescriptor pdd = new ServiceDeploymentDescriptor(configFileURL);
+
+ MBeanConfigurationElement postOfficeConfig =
+ (MBeanConfigurationElement)pdd.query("service", "PostOffice").iterator().next();
+
+ // first, we try to use a channel factory service, if we find one configured
+ String s = (String)postOfficeConfig.getAttributeValue("ChannelFactoryName");
+
+ if (s != null && !skipMultiplex)
+ {
+ // there's a chance we can use a multiplexer service
+ ObjectName channelFactoryName = new ObjectName(s);
+
+ String channelPartitionName =
+ (String)postOfficeConfig.getAttributeValue("ChannelPartitionName");
+
+ if (channelPartitionName == null)
+ {
+ throw new IllegalStateException("Cannot find ChannelPartitionName");
+ }
+
+ String syncChannelName = (String)postOfficeConfig.getAttributeValue("SyncChannelName");
+
+ if (syncChannelName == null)
+ {
+ throw new IllegalStateException("Cannot find SyncChannelName");
+ }
+
+ String asyncChannelName = (String)postOfficeConfig.getAttributeValue("AsyncChannelName");
+
+ if (asyncChannelName == null)
+ {
+ throw new IllegalStateException("Cannot find AsyncChannelName");
+ }
+
+ try
+ {
+ if(mbeanServer.getMBeanInfo(channelFactoryName) != null)
+ {
+ multiplexorDelegate =
+ new MultiplexerJChannelFactory(mbeanServer, channelFactoryName,
+ channelPartitionName, syncChannelName,
+ asyncChannelName);
+
+ // initialization ends here, we've found what we were looking for
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ // that's alright, no multiplexer there, use the regular XML configuration
+ log.debug("Wasn't able to find " + s);
+ }
+ }
+
+ // the only chance now is to use the XML configurations
+
+ s = (String)postOfficeConfig.getAttributeValue("SyncChannelConfig");
+
+ if (s == null)
+ {
+ throw new IllegalStateException("Cannot find SyncChannelConfig");
+ }
+
+ syncConfig = XMLUtil.stringToElement(s);
+
+ s = (String)postOfficeConfig.getAttributeValue("AsyncChannelConfig");
+
+ if (s == null)
+ {
+ throw new IllegalStateException("Cannot find AsyncChannelConfig");
+ }
+
+ asyncConfig = XMLUtil.stringToElement(s);
+ }
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPersistenceServiceConfigFileJChannelFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2007-01-31 11:29:26 UTC (rev 2115)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2007-01-31 11:29:48 UTC (rev 2116)
@@ -25,26 +25,15 @@
import java.util.Iterator;
import java.util.List;
-import org.jboss.messaging.core.FilterFactory;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.local.PagingFilteredQueue;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.contract.ConditionFactory;
-import org.jboss.messaging.core.plugin.contract.FailoverMapper;
import org.jboss.messaging.core.plugin.postoffice.Binding;
-import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
-import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
-import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.test.messaging.core.SimpleCondition;
-import org.jboss.test.messaging.core.SimpleConditionFactory;
import org.jboss.test.messaging.core.SimpleFilter;
-import org.jboss.test.messaging.core.SimpleFilterFactory;
import org.jboss.test.messaging.core.SimpleReceiver;
import org.jboss.test.messaging.core.plugin.postoffice.DefaultPostOfficeTest;
import org.jboss.test.messaging.util.CoreMessageFactory;
@@ -63,48 +52,36 @@
*/
public class DefaultClusteredPostOfficeTest extends DefaultPostOfficeTest
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
- // Constructors --------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
public DefaultClusteredPostOfficeTest(String name)
{
super(name);
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
- public void setUp() throws Exception
- {
- super.setUp();
- }
-
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
public final void testSimpleJoinLeave() throws Throwable
{
ClusteredPostOffice office1 = null;
-
ClusteredPostOffice office2 = null;
-
ClusteredPostOffice office3 = null;
try
{
log.trace("Starting office 1");
- office1 = createClusteredPostOffice(1, "testgroup");
-
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
+
log.trace("starting office 2");
- office2 = createClusteredPostOffice(2, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
- office3 = createClusteredPostOffice(3, "testgroup");
+ office3 = createClusteredPostOffice(3, "testgroup", sc, ms, pm, tr, pool);
Thread.sleep(2000);
@@ -152,7 +129,7 @@
{
// Start one office
- office1 = createClusteredPostOffice(1, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
log.info("Created office1");
@@ -178,7 +155,7 @@
// Start another office - make sure it picks up the bindings from the first node
- office2 = createClusteredPostOffice(2, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
log.info("Created office 2");
@@ -274,7 +251,7 @@
// Add a third office
- office3 = createClusteredPostOffice(3, "testgroup");
+ office3 = createClusteredPostOffice(3, "testgroup", sc, ms, pm, tr, pool);
// Maks sure it picks up the bindings
@@ -418,9 +395,9 @@
assertEquivalent(binding6, (Binding)iter.next());
// Restart office 1 and office 2
- office1 = createClusteredPostOffice(1, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
- office2 = createClusteredPostOffice(2, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
bindings = office1.listAllBindingsForCondition(new SimpleCondition("topic1"));
assertNotNull(bindings);
@@ -453,9 +430,9 @@
office3.stop();
// Start them all
- office1 = createClusteredPostOffice(1, "testgroup");
- office2 = createClusteredPostOffice(2, "testgroup");
- office3 = createClusteredPostOffice(3, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
+ office3 = createClusteredPostOffice(3, "testgroup", sc, ms, pm, tr, pool);
// Only the durable queue should survive
@@ -616,34 +593,43 @@
try
{
- office1 = createClusteredPostOffice(1, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
- office2 = createClusteredPostOffice(2, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding1 = office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
+ office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding2 = office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
+ office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ LocalClusteredQueue queue3 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
try
{
- Binding binding3 = office1.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
+ office1.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
fail();
}
catch (Exception e)
{
//Ok
}
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ LocalClusteredQueue queue4 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
try
{
- Binding binding4 = office2.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
+ office2.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
fail();
}
catch (Exception e)
@@ -655,14 +641,18 @@
office1.unbindClusteredQueue("queue1");
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ LocalClusteredQueue queue5 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding5 = office1.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
+ office1.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
- PagingFilteredQueue queue6 = new PagingFilteredQueue("queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null);
+ PagingFilteredQueue queue6 =
+ new PagingFilteredQueue("queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null);
try
{
- Binding binding6 = office1.bindQueue(new SimpleCondition("queue1"), queue6);
+ office1.bindQueue(new SimpleCondition("queue1"), queue6);
fail();
}
catch (Exception e)
@@ -672,17 +662,26 @@
office1.unbindClusteredQueue("queue1");
- //It should be possible to bind queues locally into a clustered post office
- LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding7 = office1.bindQueue(new SimpleCondition("queue1"), queue7);
+ // It should be possible to bind queues locally into a clustered post office
+ LocalClusteredQueue queue7 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office1.bindQueue(new SimpleCondition("queue1"), queue7);
- LocalClusteredQueue queue8 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding8 = office2.bindQueue(new SimpleCondition("queue1"), queue8);
+ LocalClusteredQueue queue8 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office2.bindQueue(new SimpleCondition("queue1"), queue8);
- LocalClusteredQueue queue9 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ LocalClusteredQueue queue9 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+
try
{
- Binding binding9 = office1.bindQueue(new SimpleCondition("queue1"), queue9);
+ office1.bindQueue(new SimpleCondition("queue1"), queue9);
fail();
}
catch (Exception e)
@@ -705,11 +704,12 @@
}
}
- // Package protected ---------------------------------------------
+ // Package protected ----------------------------------------------------------------------------
- // Protected -----------------------------------------------------
+ // Protected ------------------------------------------------------------------------------------
- protected void clusteredRouteWithFilter(boolean persistentMessage, boolean recoverable) throws Throwable
+ protected void clusteredRouteWithFilter(boolean persistentMessage, boolean recoverable)
+ throws Throwable
{
ClusteredPostOffice office1 = null;
@@ -717,29 +717,40 @@
try
{
- office1 = createClusteredPostOffice(1, "testgroup");
- office2 = createClusteredPostOffice(2, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
SimpleFilter filter1 = new SimpleFilter(2);
SimpleFilter filter2 = new SimpleFilter(3);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), -1, filter1, tr);
- Binding binding1 =
- office1.bindClusteredQueue(new SimpleCondition("topic1"), queue1);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor)pool.get(), -1, filter1, tr);
+
+ office1.bindClusteredQueue(new SimpleCondition("topic1"), queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), -1, filter2, tr);
- Binding binding2 =
- office2.bindClusteredQueue(new SimpleCondition("topic1"), queue2);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor)pool.get(), -1, filter2, tr);
+
+ office2.bindClusteredQueue(new SimpleCondition("topic1"), queue2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, 2, "queue3", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding3 =
- office2.bindClusteredQueue(new SimpleCondition("topic1"), queue3);
+ LocalClusteredQueue queue3 =
+ new LocalClusteredQueue(office2, 2, "queue3", channelIDManager.getID(), ms, pm, true,
+ recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office2.bindClusteredQueue(new SimpleCondition("topic1"), queue3);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
queue1.add(receiver1);
+
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
queue2.add(receiver2);
+
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
queue3.add(receiver3);
Message msg1 = CoreMessageFactory.createCoreMessage(1);
@@ -824,8 +835,8 @@
try
{
- office1 = createClusteredPostOffice(1, "testgroup");
- office2 = createClusteredPostOffice(2, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
//Two topics with a mixture of durable and non durable subscriptions
@@ -1013,42 +1024,62 @@
try
{
- office1 = createClusteredPostOffice(1, "testgroup");
- office2 = createClusteredPostOffice(2, "testgroup");
- office3 = createClusteredPostOffice(3, "testgroup");
- office4 = createClusteredPostOffice(4, "testgroup");
- office5 = createClusteredPostOffice(5, "testgroup");
- office6 = createClusteredPostOffice(6, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
+ office3 = createClusteredPostOffice(3, "testgroup", sc, ms, pm, tr, pool);
+ office4 = createClusteredPostOffice(4, "testgroup", sc, ms, pm, tr, pool);
+ office5 = createClusteredPostOffice(5, "testgroup", sc, ms, pm, tr, pool);
+ office6 = createClusteredPostOffice(6, "testgroup", sc, ms, pm, tr, pool);
- //We deploy the queue on nodes 1, 2, 3, 4 and 5
- //We don't deploy on node 6
+ // We deploy the queue on nodes 1, 2, 3, 4 and 5
+ // We don't deploy on node 6
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding1 = office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true,
+ recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
+
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding2 = office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true,
+ recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
+
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
+ LocalClusteredQueue queue3 =
+ new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true,
+ recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
+
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue3.add(receiver3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding4 = office4.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
+ LocalClusteredQueue queue4 =
+ new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office4.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
+
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue4.add(receiver4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding5 = office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
+ LocalClusteredQueue queue5 =
+ new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true,
+ recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue5.add(receiver5);
- //We are using a AlwaysLocalRoutingPolicy so only the local queue should ever get the message if the filter matches
+ // We are using a AlwaysLocalRoutingPolicy so only the local queue should ever get the
+ // message if the filter matches
Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
MessageReference ref = ms.reference(msg);
@@ -1159,35 +1190,43 @@
}
- /*
+ /**
* Clustered post offices should be able to have local queues bound to them too.
*/
protected void routeLocalQueues(boolean persistentMessage, boolean recoverable) throws Throwable
{
ClusteredPostOffice office1 = null;
-
ClusteredPostOffice office2 = null;
-
ClusteredPostOffice office3 = null;
try
{
- office1 = createClusteredPostOffice(1, "testgroup");
- office2 = createClusteredPostOffice(2, "testgroup");
- office3 = createClusteredPostOffice(3, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
+ office3 = createClusteredPostOffice(3, "testgroup", sc, ms, pm, tr, pool);
- LocalClusteredQueue sub1 = new LocalClusteredQueue(office1, 1, "sub1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding1 = office1.bindQueue(new SimpleCondition("topic"), sub1);
+ LocalClusteredQueue sub1 =
+ new LocalClusteredQueue(office1, 1, "sub1", channelIDManager.getID(), ms, pm, true,
+ recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office1.bindQueue(new SimpleCondition("topic"), sub1);
+
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sub1.add(receiver1);
- LocalClusteredQueue sub2 = new LocalClusteredQueue(office2, 2, "sub2", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding2 = office2.bindQueue(new SimpleCondition("topic"), sub2);
+ LocalClusteredQueue sub2 =
+ new LocalClusteredQueue(office2, 2, "sub2", channelIDManager.getID(), ms, pm, true,
+ recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office2.bindQueue(new SimpleCondition("topic"), sub2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sub2.add(receiver2);
- LocalClusteredQueue sub3 = new LocalClusteredQueue(office3, 3, "sub3", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding3 = office3.bindQueue(new SimpleCondition("topic"), sub3);
+ LocalClusteredQueue sub3 =
+ new LocalClusteredQueue(office3, 3, "sub3", channelIDManager.getID(), ms, pm, true,
+ recoverable, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office3.bindQueue(new SimpleCondition("topic"), sub3);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sub3.add(receiver3);
@@ -1248,8 +1287,9 @@
- /*
- * We set up a complex scenario with multiple subscriptions, shared and unshared on different nodes
+ /**
+ * We set up a complex scenario with multiple subscriptions, shared and unshared on different
+ * nodes.
*
* node1: no subscriptions
* node2: 2 non durable
@@ -1260,128 +1300,159 @@
* node7: 1 shared durable (shared2)
*
* Then we send mess
- *
- *
*/
protected void routeComplexTopic(boolean persistent) throws Throwable
{
ClusteredPostOffice office1 = null;
-
ClusteredPostOffice office2 = null;
-
ClusteredPostOffice office3 = null;
-
ClusteredPostOffice office4 = null;
-
ClusteredPostOffice office5 = null;
-
ClusteredPostOffice office6 = null;
-
ClusteredPostOffice office7 = null;
try
{
- office1 = createClusteredPostOffice(1, "testgroup");
- office2 = createClusteredPostOffice(2, "testgroup");
- office3 = createClusteredPostOffice(3, "testgroup");
- office4 = createClusteredPostOffice(4, "testgroup");
- office5 = createClusteredPostOffice(5, "testgroup");
- office6 = createClusteredPostOffice(6, "testgroup");
- office7 = createClusteredPostOffice(7, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
+ office3 = createClusteredPostOffice(3, "testgroup", sc, ms, pm, tr, pool);
+ office4 = createClusteredPostOffice(4, "testgroup", sc, ms, pm, tr, pool);
+ office5 = createClusteredPostOffice(5, "testgroup", sc, ms, pm, tr, pool);
+ office6 = createClusteredPostOffice(6, "testgroup", sc, ms, pm, tr, pool);
+ office7 = createClusteredPostOffice(7, "testgroup", sc, ms, pm, tr, pool);
//Node 2
//======
- //Non durable 1 on node 2
- LocalClusteredQueue nonDurable1 = new LocalClusteredQueue(office2, 2, "nondurable1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding1 = office2.bindClusteredQueue(new SimpleCondition("topic"), nonDurable1);
+ // Non durable 1 on node 2
+ LocalClusteredQueue nonDurable1 =
+ new LocalClusteredQueue(office2, 2, "nondurable1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office2.bindClusteredQueue(new SimpleCondition("topic"), nonDurable1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable1.add(receiver1);
- //Non durable 2 on node 2
- LocalClusteredQueue nonDurable2 = new LocalClusteredQueue(office2, 2, "nondurable2", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding2 = office2.bindClusteredQueue(new SimpleCondition("topic"), nonDurable2);
+ // Non durable 2 on node 2
+ LocalClusteredQueue nonDurable2 =
+ new LocalClusteredQueue(office2, 2, "nondurable2", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office2.bindClusteredQueue(new SimpleCondition("topic"), nonDurable2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable2.add(receiver2);
//Node 3
//======
- //Non shared durable
- LocalClusteredQueue nonSharedDurable1 = new LocalClusteredQueue(office3, 3, "nonshareddurable1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("topic"), nonSharedDurable1);
+ // Non shared durable
+ LocalClusteredQueue nonSharedDurable1 =
+ new LocalClusteredQueue(office3, 3, "nonshareddurable1", channelIDManager.getID(), ms,
+ pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office3.bindClusteredQueue(new SimpleCondition("topic"), nonSharedDurable1);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonSharedDurable1.add(receiver3);
- //Non durable
- LocalClusteredQueue nonDurable3 = new LocalClusteredQueue(office3, 3, "nondurable3", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding4 = office3.bindClusteredQueue(new SimpleCondition("topic"), nonDurable3);
+ // Non durable
+ LocalClusteredQueue nonDurable3 =
+ new LocalClusteredQueue(office3, 3, "nondurable3", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office3.bindClusteredQueue(new SimpleCondition("topic"), nonDurable3);
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable3.add(receiver4);
//Node 4
//======
- //Shared durable
- LocalClusteredQueue sharedDurable1 = new LocalClusteredQueue(office4, 4, "shareddurable1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding5 = office4.bindClusteredQueue(new SimpleCondition("topic"), sharedDurable1);
+ // Shared durable
+ LocalClusteredQueue sharedDurable1 =
+ new LocalClusteredQueue(office4, 4, "shareddurable1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office4.bindClusteredQueue(new SimpleCondition("topic"), sharedDurable1);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable1.add(receiver5);
- //Non shared durable
- LocalClusteredQueue nonSharedDurable2 = new LocalClusteredQueue(office4, 4, "nonshareddurable2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding6 = office4.bindClusteredQueue(new SimpleCondition("topic"), nonSharedDurable2);
+ // Non shared durable
+ LocalClusteredQueue nonSharedDurable2 =
+ new LocalClusteredQueue(office4, 4, "nonshareddurable2", channelIDManager.getID(), ms,
+ pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office4.bindClusteredQueue(new SimpleCondition("topic"), nonSharedDurable2);
SimpleReceiver receiver6 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonSharedDurable2.add(receiver6);
- //Non durable
- LocalClusteredQueue nonDurable4 = new LocalClusteredQueue(office4, 4, "nondurable4", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding7 = office4.bindClusteredQueue(new SimpleCondition("topic"), nonDurable4);
+ // Non durable
+ LocalClusteredQueue nonDurable4 =
+ new LocalClusteredQueue(office4, 4, "nondurable4", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office4.bindClusteredQueue(new SimpleCondition("topic"), nonDurable4);
SimpleReceiver receiver7 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable4.add(receiver7);
// Non durable
- LocalClusteredQueue nonDurable5 = new LocalClusteredQueue(office4, 4, "nondurable5", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding8 = office4.bindClusteredQueue(new SimpleCondition("topic"), nonDurable5);
+ LocalClusteredQueue nonDurable5 =
+ new LocalClusteredQueue(office4, 4, "nondurable5", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office4.bindClusteredQueue(new SimpleCondition("topic"), nonDurable5);
SimpleReceiver receiver8 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable5.add(receiver8);
- //Non durable
- LocalClusteredQueue nonDurable6 = new LocalClusteredQueue(office4, 4, "nondurable6", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding9 = office4.bindClusteredQueue(new SimpleCondition("topic"), nonDurable6);
+ // Non durable
+ LocalClusteredQueue nonDurable6 =
+ new LocalClusteredQueue(office4, 4, "nondurable6", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office4.bindClusteredQueue(new SimpleCondition("topic"), nonDurable6);
SimpleReceiver receiver9 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable6.add(receiver9);
// Node 5
//=======
- //Shared durable
- LocalClusteredQueue sharedDurable2 = new LocalClusteredQueue(office5, 5, "shareddurable1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding10 = office5.bindClusteredQueue(new SimpleCondition("topic"), sharedDurable2);
+ // Shared durable
+ LocalClusteredQueue sharedDurable2 =
+ new LocalClusteredQueue(office5, 5, "shareddurable1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office5.bindClusteredQueue(new SimpleCondition("topic"), sharedDurable2);
SimpleReceiver receiver10 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable2.add(receiver10);
- //Shared durable
- LocalClusteredQueue sharedDurable3 = new LocalClusteredQueue(office5, 5, "shareddurable2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding11 = office5.bindClusteredQueue(new SimpleCondition("topic"), sharedDurable3);
+ // Shared durable
+ LocalClusteredQueue sharedDurable3 =
+ new LocalClusteredQueue(office5, 5, "shareddurable2", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office5.bindClusteredQueue(new SimpleCondition("topic"), sharedDurable3);
SimpleReceiver receiver11 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable3.add(receiver11);
// Node 6
//=========
- LocalClusteredQueue sharedDurable4 = new LocalClusteredQueue(office6, 6, "shareddurable2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding12 = office6.bindClusteredQueue(new SimpleCondition("topic"), sharedDurable4);
+ LocalClusteredQueue sharedDurable4 =
+ new LocalClusteredQueue(office6, 6, "shareddurable2", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office6.bindClusteredQueue(new SimpleCondition("topic"), sharedDurable4);
SimpleReceiver receiver12 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable4.add(receiver12);
- LocalClusteredQueue nonDurable7 = new LocalClusteredQueue(office6, 6, "nondurable7", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding13 = office6.bindClusteredQueue(new SimpleCondition("topic"), nonDurable7);
+ LocalClusteredQueue nonDurable7 =
+ new LocalClusteredQueue(office6, 6, "nondurable7", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office6.bindClusteredQueue(new SimpleCondition("topic"), nonDurable7);
SimpleReceiver receiver13 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable7.add(receiver13);
//Node 7
//=======
- LocalClusteredQueue sharedDurable5 = new LocalClusteredQueue(office7, 7, "shareddurable2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding14 = office7.bindClusteredQueue(new SimpleCondition("topic"), sharedDurable5);
+ LocalClusteredQueue sharedDurable5 =
+ new LocalClusteredQueue(office7, 7, "shareddurable2", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+
+ office7.bindClusteredQueue(new SimpleCondition("topic"), sharedDurable5);
SimpleReceiver receiver14 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable5.add(receiver14);
@@ -1706,58 +1777,90 @@
{
//Start two offices
- office1 = createClusteredPostOffice(1, "testgroup");
- office2 = createClusteredPostOffice(2, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
Binding[] bindings = new Binding[16];
- queues[0] = new LocalClusteredQueue(office1, 1, "sub1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[0] =
+ new LocalClusteredQueue(office1, 1, "sub1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[0] = office1.bindClusteredQueue(new SimpleCondition("topic1"), queues[0]);
- queues[1] = new LocalClusteredQueue(office1, 1, "sub2", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[1] =
+ new LocalClusteredQueue(office1, 1, "sub2", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[1] = office1.bindClusteredQueue(new SimpleCondition("topic1"), queues[1]);
- queues[2] = new LocalClusteredQueue(office2, 2, "sub3", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[2] =
+ new LocalClusteredQueue(office2, 2, "sub3", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[2] = office2.bindClusteredQueue(new SimpleCondition("topic1"), queues[2]);
- queues[3] = new LocalClusteredQueue(office2, 2, "sub4", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[3] =
+ new LocalClusteredQueue(office2, 2, "sub4", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[3] = office2.bindClusteredQueue(new SimpleCondition("topic1"), queues[3]);
- queues[4] = new LocalClusteredQueue(office2, 2, "sub5", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[4] =
+ new LocalClusteredQueue(office2, 2, "sub5", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[4] = office2.bindClusteredQueue(new SimpleCondition("topic1"), queues[4]);
- queues[5] = new LocalClusteredQueue(office1, 1, "sub6", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[5] =
+ new LocalClusteredQueue(office1, 1, "sub6", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[5] = office1.bindClusteredQueue(new SimpleCondition("topic1"), queues[5]);
- queues[6] = new LocalClusteredQueue(office1, 1, "sub7", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[6] =
+ new LocalClusteredQueue(office1, 1, "sub7", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[6] = office1.bindClusteredQueue(new SimpleCondition("topic1"), queues[6]);
- queues[7] = new LocalClusteredQueue(office1, 1, "sub8", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[7] =
+ new LocalClusteredQueue(office1, 1, "sub8", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[7] = office1.bindClusteredQueue(new SimpleCondition("topic1"), queues[7]);
- queues[8] = new LocalClusteredQueue(office1, 1, "sub9", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[8] =
+ new LocalClusteredQueue(office1, 1, "sub9", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[8] = office1.bindClusteredQueue(new SimpleCondition("topic2"), queues[8]);
- queues[9] = new LocalClusteredQueue(office1, 1, "sub10", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[9] =
+ new LocalClusteredQueue(office1, 1, "sub10", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[9] = office1.bindClusteredQueue(new SimpleCondition("topic2"), queues[9]);
- queues[10] = new LocalClusteredQueue(office2, 2, "sub11", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[10] =
+ new LocalClusteredQueue(office2, 2, "sub11", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[10] = office2.bindClusteredQueue(new SimpleCondition("topic2"), queues[10]);
- queues[11] = new LocalClusteredQueue(office2, 2, "sub12", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[11] =
+ new LocalClusteredQueue(office2, 2, "sub12", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[11] = office2.bindClusteredQueue(new SimpleCondition("topic2"), queues[11]);
- queues[12] = new LocalClusteredQueue(office2, 2, "sub13", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[12] =
+ new LocalClusteredQueue(office2, 2, "sub13", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[12] = office2.bindClusteredQueue(new SimpleCondition("topic2"), queues[12]);
- queues[13] = new LocalClusteredQueue(office1, 1, "sub14", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[13] =
+ new LocalClusteredQueue(office1, 1, "sub14", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[13] = office1.bindClusteredQueue(new SimpleCondition("topic2"), queues[13]);
- queues[14] = new LocalClusteredQueue(office1, 1, "sub15", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[14] =
+ new LocalClusteredQueue(office1, 1, "sub15", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[14] = office1.bindClusteredQueue(new SimpleCondition("topic2"), queues[14]);
- queues[15] = new LocalClusteredQueue(office1, 1, "sub16", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), -1, null, tr);
+ queues[15] =
+ new LocalClusteredQueue(office1, 1, "sub16", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor)pool.get(), -1, null, tr);
bindings[15] = office1.bindClusteredQueue(new SimpleCondition("topic2"), queues[15]);
SimpleReceiver[] receivers = new SimpleReceiver[16];
@@ -2282,41 +2385,22 @@
office2.stop();
}
-
}
}
- protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
+ protected void setUp() throws Exception
{
- MessagePullPolicy pullPolicy = new NullMessagePullPolicy();
-
- FilterFactory ff = new SimpleFilterFactory();
-
- ClusterRouterFactory rf = new DefaultRouterFactory();
-
- FailoverMapper mapper = new DefaultFailoverMapper();
-
- ConditionFactory cf = new SimpleConditionFactory();
+ super.setUp();
+ }
- DefaultClusteredPostOffice postOffice =
- new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
- "Clustered", ms, pm, tr, ff, cf, pool,
- groupName,
- new NamedJChannelFactory(JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties()),
- 5000, 5000, pullPolicy, rf, mapper, 1000);
-
- postOffice.start();
-
- return postOffice;
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
}
- // Private -------------------------------------------------------
-
-
+ // Private --------------------------------------------------------------------------------------
- // Inner classes -------------------------------------------------
+ // Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java 2007-01-31 11:29:26 UTC (rev 2115)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java 2007-01-31 11:29:48 UTC (rev 2116)
@@ -23,21 +23,9 @@
import java.util.List;
-import org.jboss.messaging.core.FilterFactory;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.contract.ConditionFactory;
-import org.jboss.messaging.core.plugin.contract.FailoverMapper;
-import org.jboss.messaging.core.plugin.postoffice.Binding;
-import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
-import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
-import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
import org.jboss.test.messaging.core.SimpleCondition;
-import org.jboss.test.messaging.core.SimpleConditionFactory;
-import org.jboss.test.messaging.core.SimpleFilterFactory;
import org.jboss.test.messaging.core.SimpleReceiver;
import org.jboss.test.messaging.core.plugin.base.PostOfficeTestBase;
@@ -48,6 +36,7 @@
* A DefaultClusteredPostOfficeWithDefaultRouterTest
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -55,20 +44,20 @@
*/
public class DefaultClusteredPostOfficeWithDefaultRouterTest extends PostOfficeTestBase
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
- // Constructors --------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
public DefaultClusteredPostOfficeWithDefaultRouterTest(String name)
{
super(name);
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public void setUp() throws Exception
{
@@ -116,40 +105,50 @@
try
{
- office1 = createClusteredPostOffice(1, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
- office2 = createClusteredPostOffice(2, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
- office3 = createClusteredPostOffice(3, "testgroup");
+ office3 = createClusteredPostOffice(3, "testgroup", sc, ms, pm, tr, pool);
- office4 = createClusteredPostOffice(4, "testgroup");
+ office4 = createClusteredPostOffice(4, "testgroup", sc, ms, pm, tr, pool);
- office5 = createClusteredPostOffice(5, "testgroup");
+ office5 = createClusteredPostOffice(5, "testgroup", sc, ms, pm, tr, pool);
- office6 = createClusteredPostOffice(6, "testgroup");
+ office6 = createClusteredPostOffice(6, "testgroup", sc, ms, pm, tr, pool);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding1 = office2.bindClusteredQueue(new SimpleCondition("topic"), queue1);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office2.bindClusteredQueue(new SimpleCondition("topic"), queue1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding2 = office3.bindClusteredQueue(new SimpleCondition("topic"), queue2);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office3.bindClusteredQueue(new SimpleCondition("topic"), queue2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding3 = office4.bindClusteredQueue(new SimpleCondition("topic"), queue3);
+ LocalClusteredQueue queue3 =
+ new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office4.bindClusteredQueue(new SimpleCondition("topic"), queue3);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue3.add(receiver3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding4 = office5.bindClusteredQueue(new SimpleCondition("topic"), queue4);
+ LocalClusteredQueue queue4 =
+ new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office5.bindClusteredQueue(new SimpleCondition("topic"), queue4);
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue4.add(receiver4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding5 = office6.bindClusteredQueue(new SimpleCondition("topic"), queue5);
+ LocalClusteredQueue queue5 =
+ new LocalClusteredQueue(office6, 6, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office6.bindClusteredQueue(new SimpleCondition("topic"), queue5);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue5.add(receiver5);
@@ -244,53 +243,53 @@
protected void local(boolean persistent) throws Throwable
{
ClusteredPostOffice office1 = null;
-
ClusteredPostOffice office2 = null;
-
ClusteredPostOffice office3 = null;
-
ClusteredPostOffice office4 = null;
-
ClusteredPostOffice office5 = null;
-
ClusteredPostOffice office6 = null;
try
{
- office1 = createClusteredPostOffice(1, "testgroup");
+ office1 = createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
+ office2 = createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
+ office3 = createClusteredPostOffice(3, "testgroup", sc, ms, pm, tr, pool);
+ office4 = createClusteredPostOffice(4, "testgroup", sc, ms, pm, tr, pool);
+ office5 = createClusteredPostOffice(5, "testgroup", sc, ms, pm, tr, pool);
+ office6 = createClusteredPostOffice(6, "testgroup", sc, ms, pm, tr, pool);
- office2 = createClusteredPostOffice(2, "testgroup");
-
- office3 = createClusteredPostOffice(3, "testgroup");
-
- office4 = createClusteredPostOffice(4, "testgroup");
-
- office5 = createClusteredPostOffice(5, "testgroup");
-
- office6 = createClusteredPostOffice(6, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding1 = office2.bindClusteredQueue(new SimpleCondition("topic"), queue1);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office2.bindClusteredQueue(new SimpleCondition("topic"), queue1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding2 = office3.bindClusteredQueue(new SimpleCondition("topic"), queue2);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office3.bindClusteredQueue(new SimpleCondition("topic"), queue2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding3 = office4.bindClusteredQueue(new SimpleCondition("topic"), queue3);
+ LocalClusteredQueue queue3 =
+ new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office4.bindClusteredQueue(new SimpleCondition("topic"), queue3);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue3.add(receiver3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding4 = office5.bindClusteredQueue(new SimpleCondition("topic"), queue4);
+ LocalClusteredQueue queue4 =
+ new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office5.bindClusteredQueue(new SimpleCondition("topic"), queue4);
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue4.add(receiver4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIDManager.getID(), ms, pm, true, false, (QueuedExecutor)pool.get(), -1, null, tr);
- Binding binding5 = office6.bindClusteredQueue(new SimpleCondition("topic"), queue5);
+ LocalClusteredQueue queue5 =
+ new LocalClusteredQueue(office6, 6, "queue1", channelIDManager.getID(), ms, pm,
+ true, false, (QueuedExecutor)pool.get(), -1, null, tr);
+ office6.bindClusteredQueue(new SimpleCondition("topic"), queue5);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue5.add(receiver5);
@@ -373,40 +372,10 @@
}
}
-
-
- protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
- {
- MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
-
- FilterFactory ff = new SimpleFilterFactory();
-
- ClusterRouterFactory rf = new DefaultRouterFactory();
-
- FailoverMapper mapper = new DefaultFailoverMapper();
-
- ConditionFactory cf = new SimpleConditionFactory();
+ // Private --------------------------------------------------------------------------------------
- DefaultClusteredPostOffice postOffice =
- new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
- "Clustered", ms, pm, tr, ff, cf, pool,
- groupName,
- new NamedJChannelFactory(JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties()),
- 5000, 5000, redistPolicy, rf, mapper, 1000);
+ // Inner classes --------------------------------------------------------------------------------
- postOffice.start();
-
- return postOffice;
- }
-
- // Private -------------------------------------------------------
-
-
- // Inner classes -------------------------------------------------
-
-
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2007-01-31 11:29:26 UTC (rev 2115)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2007-01-31 11:29:48 UTC (rev 2116)
@@ -28,35 +28,20 @@
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.FilterFactory;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.Receiver;
import org.jboss.messaging.core.SimpleDelivery;
-import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.contract.ConditionFactory;
-import org.jboss.messaging.core.plugin.contract.FailoverMapper;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouter;
-import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredQueue;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouter;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
-import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.QueueStats;
import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.test.messaging.core.SimpleConditionFactory;
-import org.jboss.test.messaging.core.SimpleFilterFactory;
import org.jboss.test.messaging.core.SimpleReceiver;
import org.jboss.test.messaging.core.plugin.base.PostOfficeTestBase;
import org.jboss.test.messaging.util.CoreMessageFactory;
/**
- *
- * A DefaultRouterTest
- *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*
@@ -346,32 +331,6 @@
// queue.removeAllReferences();
// }
- protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
- {
- MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
-
- FilterFactory ff = new SimpleFilterFactory();
-
- ClusterRouterFactory rf = new DefaultRouterFactory();
-
- FailoverMapper mapper = new DefaultFailoverMapper();
-
- ConditionFactory cf = new SimpleConditionFactory();
-
- DefaultClusteredPostOffice postOffice =
- new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
- "Clustered", ms, pm, tr, ff, cf, pool,
- groupName,
- new NamedJChannelFactory(JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties()),
- 5000, 5000, redistPolicy, rf, mapper, 1000);
-
- postOffice.start();
-
- return postOffice;
- }
-
// Private -------------------------------------------------------
Deleted: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java 2007-01-31 11:29:26 UTC (rev 2115)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java 2007-01-31 11:29:48 UTC (rev 2116)
@@ -1,127 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.test.messaging.core.plugin.postoffice.cluster;
-
-import org.jboss.logging.Logger;
-
-/**
- * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1252 $</tt>
- *
- * $Id: JGroupsUtil.java 1252 2006-09-01 22:07:43Z timfox $
- */
-public class JGroupsUtil
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(JGroupsUtil.class);
-
-
- // Static --------------------------------------------------------
-
-
- public static String getDataStackProperties()
- {
- String host = System.getProperty("test.bind.address");
-
- log.info("test.bind.address is " + host);
-
- if (host == null)
- {
- host = "localhost";
- }
-
- return "UDP(mcast_recv_buf_size=500000;down_thread=false;ip_mcast=true;mcast_send_buf_size=32000;"+
- "mcast_port=45567;ucast_recv_buf_size=500000;use_incoming_packet_handler=false;"+
- "mcast_addr=228.8.8.8;use_outgoing_packet_handler=true;loopback=true;ucast_send_buf_size=32000;ip_ttl=32;"+
- "bind_addr=" + host + "):"+
- "AUTOCONF(down_thread=false;up_thread=false):"+
- "PING(timeout=2000;down_thread=false;num_initial_members=3;up_thread=false):"+
- "MERGE2(max_interval=10000;down_thread=false;min_interval=5000;up_thread=false):"+
- "FD_SOCK(down_thread=false;up_thread=false):"+
- "FD(timeout=20000;max_tries=3;down_thread=false;up_thread=false;shun=true):"+
- "VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):"+
- "pbcast.NAKACK(max_xmit_size=8192;down_thread=false;use_mcast_xmit=true;gc_lag=50;up_thread=false;"+
- "retransmit_timeout=100,200,600,1200,2400,4800):"+
- "UNICAST(timeout=1200,2400,3600;down_thread=false;up_thread=false):"+
- "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=20000;down_thread=false;max_bytes=0;up_thread=false):"+
- "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+
- "VIEW_SYNC(avg_send_interval=60000;down_thread=false;up_thread=false):"+
- "pbcast.GMS(print_local_addr=true;join_timeout=3000;down_thread=false;join_retry_timeout=2000;up_thread=false;shun=true)";
- }
-
-
-
- /*
- * The control stack is used for sending control messages and maintaining state
- * It must be reliable and have the state protocol enabled
- */
- public static String getControlStackProperties()
- {
- String host = System.getProperty("test.bind.address");
-
- log.info("test.bind.address is " + host);
-
- if (host == null)
- {
- host = "localhost";
- }
-
-
-
- return "UDP(mcast_recv_buf_size=500000;down_thread=false;ip_mcast=true;mcast_send_buf_size=32000;"+
- "mcast_port=45568;ucast_recv_buf_size=500000;use_incoming_packet_handler=false;"+
- "mcast_addr=228.8.8.8;use_outgoing_packet_handler=true;loopback=true;ucast_send_buf_size=32000;ip_ttl=32;"+
- "bind_addr=" + host + "):"+
- "AUTOCONF(down_thread=false;up_thread=false):"+
- "PING(timeout=2000;down_thread=false;num_initial_members=3;up_thread=false):"+
- "MERGE2(max_interval=10000;down_thread=false;min_interval=5000;up_thread=false):"+
- "FD_SOCK(down_thread=false;up_thread=false):"+
- "FD(timeout=20000;max_tries=3;down_thread=false;up_thread=false;shun=true):"+
- "VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):"+
- "pbcast.NAKACK(max_xmit_size=8192;down_thread=false;use_mcast_xmit=true;gc_lag=50;up_thread=false;"+
- "retransmit_timeout=100,200,600,1200,2400,4800):"+
- "UNICAST(timeout=1200,2400,3600;down_thread=false;up_thread=false):"+
- "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=20000;down_thread=false;max_bytes=0;up_thread=false):"+
- "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+
- "VIEW_SYNC(avg_send_interval=60000;down_thread=false;up_thread=false):"+
- "pbcast.GMS(print_local_addr=true;join_timeout=3000;down_thread=false;join_retry_timeout=2000;up_thread=false;shun=true):"+
- "pbcast.STATE_TRANSFER(down_thread=false;up_thread=false)";
- }
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Deleted: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NamedJChannelFactory.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NamedJChannelFactory.java 2007-01-31 11:29:26 UTC (rev 2115)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NamedJChannelFactory.java 2007-01-31 11:29:48 UTC (rev 2116)
@@ -1,97 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.test.messaging.core.plugin.postoffice.cluster;
-
-import org.jgroups.JChannel;
-import org.jboss.messaging.core.plugin.postoffice.cluster.jchannelfactory.JChannelFactory;
-
-/**
- * A JChannelFactory that will use config names (from jchannelfactory)
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision$</tt>
- * <p/>
- * $Id$
- */
-public class NamedJChannelFactory implements JChannelFactory
-{
-
- // Constants
-
- // Attributes
-
- String asyncConfig;
- String syncConfig;
-
- // Static
-
- // Constructors
-
- public NamedJChannelFactory(String syncConfig, String asyncConfig)
- {
- this.syncConfig = syncConfig;
- this.asyncConfig = asyncConfig;
- }
-
- // Public
-
- public String getAsyncConfig()
- {
- return asyncConfig;
- }
-
- public void setAsyncConfig(String asyncConfig)
- {
- this.asyncConfig = asyncConfig;
- }
-
- public String getSyncConfig()
- {
- return syncConfig;
- }
-
- public void setSyncConfig(String syncConfig)
- {
- this.syncConfig = syncConfig;
- }
-
- // JChannelFactory implementation
-
- public JChannel createSyncChannel() throws Exception
- {
- return new JChannel(syncConfig);
- }
-
- public JChannel createASyncChannel() throws Exception
- {
- return new JChannel(asyncConfig);
- }
-
- // Package protected
-
- // Protected
-
- // Private
-
- // Inner classes
-
-}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2007-01-31 11:29:26 UTC (rev 2115)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2007-01-31 11:29:48 UTC (rev 2116)
@@ -23,25 +23,13 @@
import java.util.List;
-import org.jboss.messaging.core.FilterFactory;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.contract.ConditionFactory;
-import org.jboss.messaging.core.plugin.contract.FailoverMapper;
-import org.jboss.messaging.core.plugin.postoffice.Binding;
-import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
-import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
-import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.test.messaging.core.SimpleCondition;
-import org.jboss.test.messaging.core.SimpleConditionFactory;
-import org.jboss.test.messaging.core.SimpleFilterFactory;
import org.jboss.test.messaging.core.SimpleReceiver;
import org.jboss.test.messaging.core.plugin.base.PostOfficeTestBase;
import org.jboss.test.messaging.util.CoreMessageFactory;
@@ -58,20 +46,20 @@
*/
public class RecoveryTest extends PostOfficeTestBase
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
- // Constructors --------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
public RecoveryTest(String name)
{
super(name);
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public void setUp() throws Exception
{
@@ -86,29 +74,31 @@
public void testCrashBeforePersist() throws Exception
{
DefaultClusteredPostOffice office1 = null;
-
DefaultClusteredPostOffice office2 = null;
-
DefaultClusteredPostOffice office3 = null;
try
{
- office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+ office1 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
+ office2 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
+ office3 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(3, "testgroup", sc, ms, pm, tr, pool);
- office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
+ office1.bindClusteredQueue(new SimpleCondition("topic1"), queue1);
- office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding1 =
- office1.bindClusteredQueue(new SimpleCondition("topic1"), queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding2 =
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
office2.bindClusteredQueue(new SimpleCondition("topic1"), queue2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding3 =
+ LocalClusteredQueue queue3 =
+ new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
office3.bindClusteredQueue(new SimpleCondition("topic1"), queue3);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
@@ -221,30 +211,34 @@
public void testCrashAfterPersist() throws Exception
{
DefaultClusteredPostOffice office1 = null;
-
DefaultClusteredPostOffice office2 = null;
-
DefaultClusteredPostOffice office3 = null;
try
{
- office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+ office1 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(1, "testgroup", sc, ms, pm, tr, pool);
- office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+ office2 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(2, "testgroup", sc, ms, pm, tr, pool);
- office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
+ office3 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(3, "testgroup", sc, ms, pm, tr, pool);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding1 =
- office1.bindClusteredQueue(new SimpleCondition("topic1"), queue1);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
+ office1.bindClusteredQueue(new SimpleCondition("topic1"), queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding2 =
- office2.bindClusteredQueue(new SimpleCondition("topic1"), queue2);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
+ office2.bindClusteredQueue(new SimpleCondition("topic1"), queue2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding3 =
- office3.bindClusteredQueue(new SimpleCondition("topic1"), queue3);
+ LocalClusteredQueue queue3 =
+ new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
+ office3.bindClusteredQueue(new SimpleCondition("topic1"), queue3);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
@@ -348,36 +342,10 @@
}
- protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
- {
- MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
+ // Private --------------------------------------------------------------------------------------
- FilterFactory ff = new SimpleFilterFactory();
+ // Inner classes --------------------------------------------------------------------------------
- ClusterRouterFactory rf = new DefaultRouterFactory();
-
- FailoverMapper mapper = new DefaultFailoverMapper();
-
- ConditionFactory cf = new SimpleConditionFactory();
-
- DefaultClusteredPostOffice postOffice =
- new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- sc.getClusteredPostOfficeSQLProperties(), true, nodeId, "Clustered",
- ms, pm, tr, ff, cf, pool,
- groupName,
- new NamedJChannelFactory(JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties()),
- 5000, 5000, redistPolicy, rf, mapper, 1000);
-
- postOffice.start();
-
- return postOffice;
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java 2007-01-31 11:29:26 UTC (rev 2115)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java 2007-01-31 11:29:48 UTC (rev 2116)
@@ -28,26 +28,15 @@
import org.jboss.messaging.core.Channel;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.FilterFactory;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.Receiver;
import org.jboss.messaging.core.SimpleDelivery;
-import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.contract.ConditionFactory;
-import org.jboss.messaging.core.plugin.contract.FailoverMapper;
-import org.jboss.messaging.core.plugin.postoffice.Binding;
-import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultMessagePullPolicy;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
-import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.test.messaging.core.SimpleCondition;
-import org.jboss.test.messaging.core.SimpleConditionFactory;
-import org.jboss.test.messaging.core.SimpleFilterFactory;
import org.jboss.test.messaging.core.SimpleReceiver;
import org.jboss.test.messaging.core.plugin.base.PostOfficeTestBase;
import org.jboss.test.messaging.util.CoreMessageFactory;
@@ -59,26 +48,27 @@
* A RedistributionWithDefaultMessagePullPolicyTest
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Oviidu Feodorov</a>
* @version <tt>$Revision$</tt>
*
* $Id$
*/
public class RedistributionWithDefaultMessagePullPolicyTest extends PostOfficeTestBase
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
- // Constructors --------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
public RedistributionWithDefaultMessagePullPolicyTest(String name)
{
super(name);
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public void setUp() throws Exception
{
@@ -133,23 +123,29 @@
public void testSimpleMessagePull() throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
DefaultClusteredPostOffice office2 = null;
try
{
- office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+ office1 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(1, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+ office2 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(2, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding1 =
- office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding2 =
- office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
+ office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
+ office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
+
Message msg = CoreMessageFactory.createCoreMessage(1);
msg.setReliable(true);
@@ -227,22 +223,27 @@
public void testSimpleMessagePullCrashBeforeCommit() throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
DefaultClusteredPostOffice office2 = null;
try
{
- office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+ office1 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(1, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+ office2 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(2, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding1 =
- office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
+ office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding2 =
- office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
+ office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
Message msg = CoreMessageFactory.createCoreMessage(1);
msg.setReliable(true);
@@ -327,22 +328,27 @@
public void testSimpleMessagePullCrashAfterCommit() throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
DefaultClusteredPostOffice office2 = null;
try
{
- office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+ office1 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(1, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+ office2 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(2, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding1 =
- office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
+ office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding2 =
- office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
+ office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
Message msg = CoreMessageFactory.createCoreMessage(1);
msg.setReliable(true);
@@ -423,22 +429,27 @@
public void testFailHandleMessagePullResult() throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
DefaultClusteredPostOffice office2 = null;
try
{
- office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+ office1 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(1, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+ office2 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(2, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding1 =
- office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
+ office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding2 =
- office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, true, (QueuedExecutor) pool.get(), -1, null, tr);
+ office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
Message msg = CoreMessageFactory.createCoreMessage(1);
msg.setReliable(true);
@@ -510,41 +521,57 @@
protected void consumeAll(boolean persistent, boolean recoverable) throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
DefaultClusteredPostOffice office2 = null;
-
DefaultClusteredPostOffice office3 = null;
-
DefaultClusteredPostOffice office4 = null;
-
DefaultClusteredPostOffice office5 = null;
try
{
- office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+ office1 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(1, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+ office2 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(2, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
+ office3 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(3, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office4 = (DefaultClusteredPostOffice) createClusteredPostOffice(4, "testgroup");
+ office4 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(4, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office5 = (DefaultClusteredPostOffice) createClusteredPostOffice(5, "testgroup");
+ office5 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(5, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding1 = office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
+ office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding2 = office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
+ office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
+ LocalClusteredQueue queue3 =
+ new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
+ office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding4 = office4.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
+ LocalClusteredQueue queue4 =
+ new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
+ office4.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding5 = office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
+ LocalClusteredQueue queue5 =
+ new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
+ office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
final int NUM_MESSAGES = 100;
@@ -673,41 +700,57 @@
protected void consumeBitByBit(boolean persistent, boolean recoverable) throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
DefaultClusteredPostOffice office2 = null;
-
DefaultClusteredPostOffice office3 = null;
-
DefaultClusteredPostOffice office4 = null;
-
DefaultClusteredPostOffice office5 = null;
try
{
- office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+ office1 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(1, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+ office2 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(2, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
+ office3 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(3, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office4 = (DefaultClusteredPostOffice) createClusteredPostOffice(4, "testgroup");
+ office4 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(4, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- office5 = (DefaultClusteredPostOffice) createClusteredPostOffice(5, "testgroup");
+ office5 = (DefaultClusteredPostOffice)
+ createClusteredPostOffice(5, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
+ sc, ms, pm, tr, pool);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding1 = office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
+ LocalClusteredQueue queue1 =
+ new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
+ office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding2 = office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
+ LocalClusteredQueue queue2 =
+ new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
+ office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
+ LocalClusteredQueue queue3 =
+ new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
+ office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding4 = office4.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
+ LocalClusteredQueue queue4 =
+ new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
+ office4.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
- Binding binding5 = office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
+ LocalClusteredQueue queue5 =
+ new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm,
+ true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
+ office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
final int NUM_MESSAGES = 100;
@@ -1147,36 +1190,10 @@
}
- protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
- {
- MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
+ // Private --------------------------------------------------------------------------------------
- FilterFactory ff = new SimpleFilterFactory();
+ // Inner classes --------------------------------------------------------------------------------
- ClusterRouterFactory rf = new DefaultRouterFactory();
-
- FailoverMapper mapper = new DefaultFailoverMapper();
-
- ConditionFactory cf = new SimpleConditionFactory();
-
- DefaultClusteredPostOffice postOffice =
- new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
- "Clustered", ms, pm, tr, ff, cf, pool,
- groupName,
- new NamedJChannelFactory(JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties()),
- 10000, 10000, pullPolicy, rf, mapper, 1000);
-
- postOffice.start();
-
- return postOffice;
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Copied: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/SimpleJChannelFactory.java (from rev 2113, trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NamedJChannelFactory.java)
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/SimpleJChannelFactory.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/SimpleJChannelFactory.java 2007-01-31 11:29:48 UTC (rev 2116)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import org.jgroups.JChannel;
+import org.jboss.messaging.core.plugin.postoffice.cluster.jchannelfactory.JChannelFactory;
+
+/**
+ * A JChannelFactory that will use String JChannel configurations to create JChannel instances.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class SimpleJChannelFactory implements JChannelFactory
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ String asyncConfig;
+ String syncConfig;
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public SimpleJChannelFactory(String syncConfig, String asyncConfig)
+ {
+ this.syncConfig = syncConfig;
+ this.asyncConfig = asyncConfig;
+ }
+
+ // JChannelFactory ------------------------------------------------------------------------------
+
+ public JChannel createSyncChannel() throws Exception
+ {
+ return new JChannel(syncConfig);
+ }
+
+ public JChannel createASyncChannel() throws Exception
+ {
+ return new JChannel(asyncConfig);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public String getAsyncConfig()
+ {
+ return asyncConfig;
+ }
+
+ public void setAsyncConfig(String asyncConfig)
+ {
+ this.asyncConfig = asyncConfig;
+ }
+
+ public String getSyncConfig()
+ {
+ return syncConfig;
+ }
+
+ public void setSyncConfig(String syncConfig)
+ {
+ this.syncConfig = syncConfig;
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/SimpleJChannelFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2007-01-31 11:29:26 UTC (rev 2115)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2007-01-31 11:29:48 UTC (rev 2116)
@@ -206,6 +206,7 @@
private boolean jca;
private boolean remoting;
private boolean security;
+ private boolean multiplexer; // the JGroups channels multiplexer
private List toUnbindAtExit;
private String ipAddressOrHostName;
@@ -397,10 +398,12 @@
{
startTransactionManager();
}
+
if (database)
{
startInVMDatabase();
}
+
if (jca)
{
startCachedConnectionManager(CACHED_CONNECTION_MANAGER_OBJECT_NAME);
@@ -416,12 +419,14 @@
DEFAULTDS_MANAGED_CONNECTION_POOL_OBJECT_NAME);
startWrapperDataSourceService();
}
+
if (database && (transaction || jbossjta) && jca && cleanDatabase)
{
// We make sure the database is clean (only if we have all dependencies the database,
// othewise we'll get an access error)
deleteAllData();
}
+
if (xarecovery)
{
startRecoveryManager();
@@ -437,20 +442,22 @@
startSecurityManager();
}
+ if (multiplexer)
+ {
+ startMultiplexer();
+ }
+
loadJNDIContexts();
log.debug("loaded JNDI context");
- // aways install multiplexer as this is a cheap operation. The actual JChannels are started
- // only on demand
- startMultiplexer();
String transport = config.getRemotingTransport();
- log.info("Remoting type: ........... " + (remoting ? transport : "DISABLED"));
- log.info("Serialization type: ...... " + config.getSerializationType());
- log.info("Database: ................ " + config.getDatabaseType());
- log.info("Clustering mode: ......... " +
+ log.info("Remoting type: .............. " + (remoting ? transport : "DISABLED"));
+ log.info("Serialization type: ......... " + config.getSerializationType());
+ log.info("Database: ................... " + config.getDatabaseType());
+ log.info("Clustering mode: ............ " +
(this.isClustered() ? "CLUSTERED" : "NON-CLUSTERED"));
log.debug(this + " started");
@@ -464,6 +471,7 @@
public void stop() throws Exception
{
+
unloadJNDIContexts();
stopService(REMOTING_OBJECT_NAME);
@@ -562,7 +570,7 @@
String persistenceConfigFile =
"server/default/deploy/" + databaseType + "-persistence-service.xml";
- log.info("Peristence config file: .. " + persistenceConfigFile);
+ log.info("Persistence config file: .... " + persistenceConfigFile);
URL persistenceConfigFileURL = getClass().getClassLoader().getResource(persistenceConfigFile);
if (persistenceConfigFileURL == null)
@@ -647,7 +655,7 @@
"server/default/deploy/clustered-" + databaseType + "-persistence-service.xml";
}
- log.info("Peristence config file: .. " + persistenceConfigFile);
+ log.info("Persistence config file: .... " + persistenceConfigFile);
URL persistenceConfigFileURL = getClass().getClassLoader().getResource(persistenceConfigFile);
if (persistenceConfigFileURL == null)
@@ -773,6 +781,11 @@
mbeanServer.removeNotificationListener(on, listener);
}
+ public MBeanServer getMBeanServer()
+ {
+ return mbeanServer;
+ }
+
public void bindDefaultJMSProvider() throws Exception
{
JNDIProviderAdapter pa = new JNDIProviderAdapter();
@@ -1443,22 +1456,27 @@
}
catch (SQLException e)
{
- //Ignore - tables might not exist
+ // Ignore - tables might not exist
}
}
- private void startMultiplexer()
- throws Exception
+ private void startMultiplexer() throws Exception
{
- log.info("Starting multiplexer");
+ log.debug("Starting multiplexer");
+
String multiplexerConfigFile = "server/default/deploy/multiplexer-service.xml";
URL multiplexerCofigURL = getClass().getClassLoader().getResource(multiplexerConfigFile);
+
if (multiplexerCofigURL == null)
{
throw new Exception("Cannot find " + multiplexerCofigURL + " in the classpath");
}
- ServiceDeploymentDescriptor multiplexerDD = new ServiceDeploymentDescriptor(multiplexerCofigURL);
- List services = multiplexerDD.query("name","Multiplexer");
+
+ ServiceDeploymentDescriptor multiplexerDD =
+ new ServiceDeploymentDescriptor(multiplexerCofigURL);
+
+ List services = multiplexerDD.query("name", "Multiplexer");
+
if (services.isEmpty())
{
log.info("Couldn't find multiplexer config");
@@ -1468,7 +1486,8 @@
log.info("Could find multiplexer config");
}
- MBeanConfigurationElement multiplexerConfig = (MBeanConfigurationElement)services.iterator().next();
+ MBeanConfigurationElement multiplexerConfig =
+ (MBeanConfigurationElement)services.iterator().next();
ObjectName nameMultiplexer = registerAndConfigureService(multiplexerConfig);
invoke(nameMultiplexer,"create", new Object[0], new String[0]);
invoke(nameMultiplexer,"start", new Object[0], new String[0]);
@@ -1495,6 +1514,7 @@
jca = true;
remoting = true;
security = true;
+ multiplexer = true;
}
else if ("transaction".equals(tok))
{
@@ -1549,7 +1569,6 @@
{
remoting = false;
}
-
}
else if ("security".equals(tok))
{
@@ -1559,6 +1578,14 @@
security = false;
}
}
+ else if ("multiplexer".equals(tok))
+ {
+ multiplexer = true;
+ if (minus)
+ {
+ multiplexer = false;
+ }
+ }
else if ("none".equals(tok))
{
transaction = false;
@@ -1566,6 +1593,7 @@
jca = false;
remoting = false;
security = false;
+ multiplexer = false;
}
else
{
More information about the jboss-cvs-commits
mailing list