[jboss-cvs] JBoss Messaging SVN: r1457 - in trunk: . src/main/org/jboss/jms/server/destination src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/paging tests/src/org/jboss/test/messaging/core/paging/base tests/src/org/jboss/test/messaging/core/plugin tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Oct 9 16:39:34 EDT 2006
Author: timfox
Date: 2006-10-09 16:39:19 -0400 (Mon, 09 Oct 2006)
New Revision: 1457
Added:
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
Modified:
trunk/.classpath
trunk/src/main/org/jboss/jms/server/destination/TopicService.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-573 and http://jira.jboss.com/jira/browse/JBMESSAGING-579 and a few other things
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/.classpath 2006-10-09 20:39:19 UTC (rev 1457)
@@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
- <classpathentry excluding="**/.svn/**/*" kind="src" path="perf/src"/>
<classpathentry kind="src" path="output/gen-parsers"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src"/>
@@ -21,8 +20,6 @@
<classpathentry kind="lib" path="tests/lib/jms-ra.jar"/>
<classpathentry kind="lib" path="tests/lib/mysql-connector-java-3.1.13-bin.jar"/>
<classpathentry kind="lib" path="thirdparty/jgroups/lib/jgroups.jar"/>
- <classpathentry kind="lib" path="perf/resources/jcommon-1.0.0-rc1.jar"/>
- <classpathentry kind="lib" path="perf/resources/jfreechart-1.0.0-rc1.jar"/>
<classpathentry kind="lib" path="thirdparty/apache-log4j/lib/log4j.jar"/>
<classpathentry kind="lib" path="thirdparty/junit/lib/junit.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/profiler/jvmti/lib/jboss-profiler-jvmti.jar"/>
Modified: trunk/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/TopicService.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/jms/server/destination/TopicService.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -15,8 +15,10 @@
import org.jboss.jms.util.ExceptionUtil;
import org.jboss.jms.util.XMLUtil;
+import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.local.PagingFilteredQueue;
import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredQueue;
/**
* A deployable JBoss Messaging topic.
@@ -72,10 +74,10 @@
Binding binding = (Binding)iter.next();
PagingFilteredQueue queue = (PagingFilteredQueue)binding.getQueue();
-
+
queue.setPagingParams(destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize());
queue.load();
- queue.activate();
+ queue.activate();
}
dm.registerDestination(destination);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -52,5 +52,5 @@
* @return
* @throws Throwable
*/
- Binding unbindClusteredQueue(String queueName) throws Throwable;
+ Binding unbindClusteredQueue(String queueName) throws Throwable;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -25,10 +25,12 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Types;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -244,7 +246,9 @@
try
{
- Bindings cb = (Bindings)conditionMap.get(condition);
+ //We should only list the bindings for the local node
+
+ Bindings cb = (Bindings)conditionMap.get(condition);
if (cb == null)
{
@@ -252,7 +256,23 @@
}
else
{
- return cb.getAllBindings();
+ List list = new ArrayList();
+
+ Collection bindings = cb.getAllBindings();
+
+ Iterator iter = bindings.iterator();
+
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ if (binding.getNodeId() == this.nodeId)
+ {
+ list.add(binding);
+ }
+ }
+
+ return list;
}
}
finally
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -27,6 +27,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.messaging.core.tx.TxCallback;
/**
@@ -52,6 +53,35 @@
* CastMessagesCallback.beforeCommit() - cast message(s) to holding areas
* JDBCPersistenceManager.TransactionCallback.beforeCommit() - persist message(s) in database
* CastMessagesCallback.afterCommit() - send "commit" message to holding areas
+ *
+ * Failure handling:
+ *
+ * If failure of the remote node occurs after casting the message to the holding area
+ * but before the message is persisted locally, then no recovery is necessary since the message wasn't persisted.
+ * In this case an exception should be propagated to the client.
+ *
+ * If failure of the remote node occurs after casting the message to the holding area
+ * and after the message has been persisted locally then, when the failover node takes
+ * over it will load the message from the db.
+ *
+ * If failure of the local node occurs before casting the message, no recovery is necessary and an exception
+ * should be thrown to the client.
+ *
+ * If failure of the local node occurs after casting the message, but before persisting the message.
+ * Then failure of the local node casues the remote node to check it's holding area - it will then check
+ * if the message has been persisted - in this case it has not so it will discard the tx from the holding area.
+ * (TODO is it possible that the cast of the original hold arrives *after* the receiving node knows the sending
+ * node has failed - we must be able to guarantee this never happens)
+ *
+ * If failure of the local node occurs after casting the message and after persisting the message in the database.
+ * Then failure of the local node casues the remote node to check it's holding area - it will then check
+ * if the message has been persisted - in this case it has so it will excute the transaction in memory which will add the
+ * message to the in memory queue.
+ *
+ *
+ *
+ *
+ *
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
@@ -78,6 +108,10 @@
private boolean multicast;
private int toNodeId;
+
+ //Used in failure testing
+ private boolean failBeforeCommit;
+ private boolean failAfterCommit;
/*
* We store the id of one of the channels that the ref was inserted into
@@ -129,13 +163,17 @@
}
}
- CastMessagesCallback(int nodeId, long txId, PostOfficeInternal office)
+ CastMessagesCallback(int nodeId, long txId, PostOfficeInternal office, boolean failBeforeCommit, boolean failAfterCommit)
{
this.nodeId = nodeId;
this.txId = txId;
this.office = office;
+
+ this.failBeforeCommit = failBeforeCommit;
+
+ this.failAfterCommit = failAfterCommit;
}
public void afterCommit(boolean onePhase) throws Exception
@@ -150,6 +188,12 @@
if (persistent != null)
{
+ //Only used in testing
+ if (failAfterCommit)
+ {
+ throw new TransactionException("Forced failure for testing");
+ }
+
// Cast a commit message
ClusterRequest req = new SendTransactionRequest(nodeId, txId);
@@ -177,6 +221,12 @@
sendRequest(req);
}
+
+ //Only used in testing
+ if (failBeforeCommit)
+ {
+ throw new TransactionException("Forced failure for testing");
+ }
}
public void beforePrepare() throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -38,5 +38,5 @@
{
List getQueues();
- LocalClusteredQueue getLocalQueue();
+ ClusteredQueue getLocalQueue();
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -81,6 +81,10 @@
{
private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
+ //Used for failure testing
+ private boolean failBeforeCommit;
+ private boolean failAfterCommit;
+
private boolean trace = log.isTraceEnabled();
private Channel syncChannel;
@@ -471,15 +475,14 @@
if (numberRemote == 1)
{
if (trace) { log.trace(this.nodeId + " unicasting message to " + lastNodeId); }
- //Unicast - only one node is interested in the message
-
+
+ //Unicast - only one node is interested in the message
asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId);
-
- //syncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId, false);
}
else
{
if (trace) { log.trace(this.nodeId + " multicasting message to group"); }
+
//Multicast - more than one node is interested
asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
}
@@ -490,7 +493,7 @@
if (callback == null)
{
- callback = new CastMessagesCallback(nodeId, tx.getId(), DefaultClusteredPostOffice.this);
+ callback = new CastMessagesCallback(nodeId, tx.getId(), DefaultClusteredPostOffice.this, failBeforeCommit, failAfterCommit);
//This callback MUST be executed first
@@ -978,7 +981,7 @@
//Maybe the local queue now wants to pull message(s) from the remote queue given that the
//stats for the remote queue have changed
- LocalClusteredQueue localQueue = router.getLocalQueue();
+ LocalClusteredQueue localQueue = (LocalClusteredQueue)router.getLocalQueue();
if (localQueue != null)
{
@@ -1031,10 +1034,23 @@
return dels;
}
+
+ // Public ------------------------------------------------------------------------------------------
-
- // Public ------------------------------------------------------------------------------------------
-
+
+ //Used for testing only
+ public void setFail(boolean beforeCommit, boolean afterCommit)
+ {
+ this.failBeforeCommit = beforeCommit;
+ this.failAfterCommit = afterCommit;
+ }
+
+ //Used for testing only
+ public Collection getHoldingTransactions()
+ {
+ return holdingArea.values();
+ }
+
// Protected ---------------------------------------------------------------------------------------
protected void addToConditionMap(Binding binding)
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -46,6 +46,8 @@
* In this case, with the assumption that producers and consumers are distributed evenly across the cluster
* then sending the message to the local queue is the most efficient policy.
*
+ * The exception to this if there are no consumers on the local queue.
+ *
* In the case of a durable subscription, there may well be no local queue since the durable subscription lives
* only on the number of nodes that it is looked up at.
*
@@ -66,7 +68,7 @@
//MUST be an arraylist for fast index access
private ArrayList queues;
- private LocalClusteredQueue localQueue;
+ private ClusteredQueue localQueue;
private int target;
@@ -80,7 +82,7 @@
return queues.size();
}
- public LocalClusteredQueue getLocalQueue()
+ public ClusteredQueue getLocalQueue()
{
return localQueue;
}
@@ -95,7 +97,7 @@
{
throw new IllegalStateException("Already has local queue");
}
- localQueue = (LocalClusteredQueue)queue;
+ localQueue = queue;
}
queues.add(queue);
@@ -148,8 +150,8 @@
if (trace) { log.trace(this + " routing ref " + reference); }
//Favour the local queue
-
- if (localQueue != null)
+
+ if (localQueue != null && localQueue.numberOfReceivers() > 0)
{
//The only time the local queue won't accept is if the selector doesn't
//match - in which case it won't match at any other nodes too so no point
@@ -163,24 +165,27 @@
}
else
{
- //There is no local shared queue
-
+ //There is no local shared queue or the local queue has no consumers
+
//We round robin among the rest
- if (!queues.isEmpty())
+ if ((localQueue == null && !queues.isEmpty()) || (localQueue != null && queues.size() > 1))
{
ClusteredQueue queue = (ClusteredQueue)queues.get(target);
+ if (queue == localQueue)
+ {
+ //We don't want to choose the local queue
+ incTarget();
+ }
+
+ queue = (ClusteredQueue)queues.get(target);
+
Delivery del = queue.handle(observer, reference, tx);
-
+
if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }
- target++;
-
- if (target == queues.size())
- {
- target = 0;
- }
-
+ incTarget();
+
//Again, if the selector doesn't match then it won't on any others so no point trying them
return del;
}
@@ -191,6 +196,16 @@
return null;
}
+ private void incTarget()
+ {
+ target++;
+
+ if (target == queues.size())
+ {
+ target = 0;
+ }
+ }
+
public List getQueues()
{
return queues;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -47,7 +47,7 @@
* $Id$
*
*/
-class RemoteQueueStub implements ClusteredQueue
+public class RemoteQueueStub implements ClusteredQueue
{
private static final Logger log = Logger.getLogger(RemoteQueueStub.class);
@@ -65,7 +65,8 @@
private QueueStats stats;
- RemoteQueueStub(int nodeId, String name, long id, boolean recoverable, PersistenceManager pm, Filter filter)
+ public RemoteQueueStub(int nodeId, String name, long id, boolean recoverable,
+ PersistenceManager pm, Filter filter)
{
this.nodeId = nodeId;
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -291,10 +291,8 @@
//Now consume them all
- log.info("Consuming them all from 1");
this.consume(queue1, 0, refs1, 150);
- log.info("Consuming them all from 2");
this.consume(queue2, 0, refs2, 150);
// Queue1
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -465,9 +465,7 @@
Connection conn = ds.getConnection();
List msgIds = new ArrayList();
-
- //log.info("channel id:" + channelId);
-
+
String sql = "SELECT MESSAGEID, ORD, PAGE_ORD FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? ORDER BY PAGE_ORD";
PreparedStatement ps = conn.prepareStatement(sql);
ps.setLong(1, channelId);
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -685,9 +685,6 @@
//First load exactly 10
PersistenceManager.InitialLoadInfo info = pm.getInitialReferenceInfos(channel.getChannelID(), 10);
- log.info("min:" + info.getMinPageOrdering());
- log.info("max:" + info.getMaxPageOrdering());
-
assertNull(info.getMinPageOrdering());
assertNull(info.getMaxPageOrdering());
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -621,9 +621,7 @@
}
}
}
-
-
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -663,9 +661,7 @@
Message msg1 = CoreMessageFactory.createCoreMessage(1);
MessageReference ref1 = ms.reference(msg1);
- log.info("Sending message 1");
boolean routed = office1.route(ref1, "topic1", null);
- log.info("Sent message 1");
assertTrue(routed);
Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -0,0 +1,556 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ *
+ * A DefaultClusteredPostOfficeWithDefaultRouterTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DefaultClusteredPostOfficeWithDefaultRouterTest extends ClusteringTestBase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DefaultClusteredPostOfficeWithDefaultRouterTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testNotLocalPersistent() throws Throwable
+ {
+ notLocal(true);
+ }
+
+ public void testNotLocalNonPersistent() throws Throwable
+ {
+ notLocal(false);
+ }
+
+ public void testLocalPersistent() throws Throwable
+ {
+ local(true);
+ }
+
+ public void testLocalNonPersistent() throws Throwable
+ {
+ local(false);
+ }
+
+
+ public void testLocalNonConsumersPersistent() throws Throwable
+ {
+ localNoConsumers(true);
+ }
+
+ public void testLocalNoConsumersNonPersistent() throws Throwable
+ {
+ localNoConsumers(false);
+ }
+
+
+ protected void notLocal(boolean persistent) throws Throwable
+ {
+ ClusteredPostOffice office1 = null;
+
+ ClusteredPostOffice office2 = null;
+
+ ClusteredPostOffice office3 = null;
+
+ ClusteredPostOffice office4 = null;
+
+ ClusteredPostOffice office5 = null;
+
+ ClusteredPostOffice office6 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+
+ office2 = createClusteredPostOffice(2, "testgroup");
+
+ office3 = createClusteredPostOffice(3, "testgroup");
+
+ office4 = createClusteredPostOffice(4, "testgroup");
+
+ office5 = createClusteredPostOffice(5, "testgroup");
+
+ office6 = createClusteredPostOffice(6, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 = office2.bindClusteredQueue("topic", queue1);
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 = office3.bindClusteredQueue("topic", queue2);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.add(receiver2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 = office4.bindClusteredQueue("topic", queue3);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.add(receiver3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding4 = office5.bindClusteredQueue("topic", queue4);
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue4.add(receiver4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding5 = office6.bindClusteredQueue("topic", queue5);
+ SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue5.add(receiver5);
+
+ List msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkEmpty(receiver1);
+ checkEmpty(receiver2);
+ checkContainsAndAcknowledge(msgs, receiver3, queue1);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkEmpty(receiver1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkContainsAndAcknowledge(msgs, receiver4, queue1);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkEmpty(receiver1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkContainsAndAcknowledge(msgs, receiver5, queue1);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+
+ if (office6 != null)
+ {
+ office6.stop();
+ }
+ }
+ }
+
+ //if the local queue has no consumers then we treat as if there was no local queue
+ protected void localNoConsumers(boolean persistent) throws Throwable
+ {
+ ClusteredPostOffice office1 = null;
+
+ ClusteredPostOffice office2 = null;
+
+ ClusteredPostOffice office3 = null;
+
+ ClusteredPostOffice office4 = null;
+
+ ClusteredPostOffice office5 = null;
+
+ ClusteredPostOffice office6 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+
+ office2 = createClusteredPostOffice(2, "testgroup");
+
+ office3 = createClusteredPostOffice(3, "testgroup");
+
+ office4 = createClusteredPostOffice(4, "testgroup");
+
+ office5 = createClusteredPostOffice(5, "testgroup");
+
+ office6 = createClusteredPostOffice(6, "testgroup");
+
+ LocalClusteredQueue queueLocal = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding bindingLocal = office1.bindClusteredQueue("topic", queueLocal);
+ //No consumer on the local one
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 = office2.bindClusteredQueue("topic", queue1);
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 = office3.bindClusteredQueue("topic", queue2);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.add(receiver2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 = office4.bindClusteredQueue("topic", queue3);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.add(receiver3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding4 = office5.bindClusteredQueue("topic", queue4);
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue4.add(receiver4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding5 = office6.bindClusteredQueue("topic", queue5);
+ SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue5.add(receiver5);
+
+ List msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkEmpty(receiver1);
+ checkEmpty(receiver2);
+ checkContainsAndAcknowledge(msgs, receiver3, queue1);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkEmpty(receiver1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkContainsAndAcknowledge(msgs, receiver4, queue1);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkEmpty(receiver1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkContainsAndAcknowledge(msgs, receiver5, queue1);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office1, 1, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+
+ if (office6 != null)
+ {
+ office6.stop();
+ }
+ }
+ }
+
+
+ protected void local(boolean persistent) throws Throwable
+ {
+ ClusteredPostOffice office1 = null;
+
+ ClusteredPostOffice office2 = null;
+
+ ClusteredPostOffice office3 = null;
+
+ ClusteredPostOffice office4 = null;
+
+ ClusteredPostOffice office5 = null;
+
+ ClusteredPostOffice office6 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+
+ office2 = createClusteredPostOffice(2, "testgroup");
+
+ office3 = createClusteredPostOffice(3, "testgroup");
+
+ office4 = createClusteredPostOffice(4, "testgroup");
+
+ office5 = createClusteredPostOffice(5, "testgroup");
+
+ office6 = createClusteredPostOffice(6, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 = office2.bindClusteredQueue("topic", queue1);
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 = office3.bindClusteredQueue("topic", queue2);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.add(receiver2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 = office4.bindClusteredQueue("topic", queue3);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.add(receiver3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding4 = office5.bindClusteredQueue("topic", queue4);
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue4.add(receiver4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding5 = office6.bindClusteredQueue("topic", queue5);
+ SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue5.add(receiver5);
+
+ List msgs = sendMessages("topic", persistent, office2, 3, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office2, 3, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office2, 3, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+
+ msgs = sendMessages("topic", persistent, office3, 3, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office3, 3, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages("topic", persistent, office3, 3, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+
+ if (office6 != null)
+ {
+ office6.stop();
+ }
+ }
+ }
+
+
+
+ protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
+ {
+ MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
+
+ FilterFactory ff = new SimpleFilterFactory();
+
+ ClusterRouterFactory rf = new DefaultRouterFactory();
+
+ DefaultClusteredPostOffice postOffice =
+ new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+ null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+ groupName,
+ JGroupsUtil.getControlStackProperties(),
+ JGroupsUtil.getDataStackProperties(),
+ 5000, 5000, redistPolicy, rf, 1, 1000);
+
+ postOffice.start();
+
+ return postOffice;
+ }
+
+ // Private -------------------------------------------------------
+
+
+ // Inner classes -------------------------------------------------
+
+
+}
+
+
+
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -21,23 +21,33 @@
*/
package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+import java.util.Iterator;
import java.util.List;
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.SimpleDelivery;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouter;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouter;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.QueueStats;
+import org.jboss.messaging.core.tx.Transaction;
import org.jboss.test.messaging.core.SimpleFilterFactory;
import org.jboss.test.messaging.core.SimpleReceiver;
import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
+import org.jboss.test.messaging.util.CoreMessageFactory;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
*
* A DefaultRouterTest
@@ -75,299 +85,292 @@
super.tearDown();
}
- public void testNotLocalPersistent() throws Throwable
+ // The router only has a local queue with a consumer
+ public void testRouterOnlyLocalWithConsumer() throws Exception
{
- notLocal(true);
+ DefaultRouter dr = new DefaultRouter();
+
+ ClusteredQueue queue = new SimpleQueue(true);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ queue.add(receiver1);
+
+ dr.add(queue);
+
+ sendAndCheck(dr, receiver1);
+
+ sendAndCheck(dr, receiver1);
+
+ sendAndCheck(dr, receiver1);
}
- public void testNotLocalNonPersistent() throws Throwable
+ //The router only has a local queue with no consumer
+ public void testRouterOnlyLocalNoConsumer() throws Exception
{
- notLocal(false);
+ DefaultRouter dr = new DefaultRouter();
+
+ ClusteredQueue queue = new SimpleQueue(true);
+
+ dr.add(queue);
+
+ Message msg = CoreMessageFactory.createCoreMessage(0, false, null);
+
+ MessageReference ref = ms.reference(msg);
+
+ Delivery del = dr.handle(null, ref, null);
+
+ assertNull(del);
+
}
- public void testLocalPersistent() throws Throwable
+ //The router has only one non local queues
+ public void testRouterOnlyOneNonLocal() throws Exception
{
- local(true);
+ DefaultRouter dr = new DefaultRouter();
+
+ ClusteredQueue queue = new SimpleQueue(false);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ queue.add(receiver1);
+
+ dr.add(queue);
+
+ sendAndCheck(dr, receiver1);
+
+ sendAndCheck(dr, receiver1);
+
+ sendAndCheck(dr, receiver1);
}
- public void testLocalNonPersistent() throws Throwable
+ //The router has multiple non local queues and no local queue
+ public void testRouterMultipleNonLocal() throws Exception
{
- local(false);
+ DefaultRouter dr = new DefaultRouter();
+
+ ClusteredQueue remote1 = new SimpleQueue(false);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ remote1.add(receiver1);
+
+ dr.add(remote1);
+
+
+ ClusteredQueue remote2 = new SimpleQueue(false);
+
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ remote2.add(receiver2);
+
+ dr.add(remote2);
+
+
+ ClusteredQueue remote3 = new SimpleQueue(false);
+
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ remote3.add(receiver3);
+
+ dr.add(remote3);
+
+ sendAndCheck(dr, receiver1);
+
+ sendAndCheck(dr, receiver2);
+
+ sendAndCheck(dr, receiver3);
+
+ sendAndCheck(dr, receiver1);
+
+ sendAndCheck(dr, receiver2);
+
+ sendAndCheck(dr, receiver3);
}
- protected void notLocal(boolean persistent) throws Throwable
+
+ // The router has one local with consumer and one non local queue
+ public void testRouterOneLocalWithConsumerOneNonLocal() throws Exception
{
- ClusteredPostOffice office1 = null;
+ DefaultRouter dr = new DefaultRouter();
+
+ ClusteredQueue remote1 = new SimpleQueue(false);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- ClusteredPostOffice office2 = null;
+ remote1.add(receiver1);
- ClusteredPostOffice office3 = null;
+ dr.add(remote1);
- ClusteredPostOffice office4 = null;
+ ClusteredQueue queue = new SimpleQueue(true);
- ClusteredPostOffice office5 = null;
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- ClusteredPostOffice office6 = null;
-
- try
- {
- office1 = createClusteredPostOffice(1, "testgroup");
-
- office2 = createClusteredPostOffice(2, "testgroup");
-
- office3 = createClusteredPostOffice(3, "testgroup");
-
- office4 = createClusteredPostOffice(4, "testgroup");
-
- office5 = createClusteredPostOffice(5, "testgroup");
-
- office6 = createClusteredPostOffice(6, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding1 = office2.bindClusteredQueue("topic", queue1);
- SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue1.add(receiver1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding2 = office3.bindClusteredQueue("topic", queue2);
- SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue2.add(receiver2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding3 = office4.bindClusteredQueue("topic", queue3);
- SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue3.add(receiver3);
-
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding4 = office5.bindClusteredQueue("topic", queue4);
- SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue4.add(receiver4);
-
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding5 = office6.bindClusteredQueue("topic", queue5);
- SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue5.add(receiver5);
-
- List msgs = sendMessages("topic", persistent, office1, 1, null);
- checkContainsAndAcknowledge(msgs, receiver1, queue1);
- checkEmpty(receiver2);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkEmpty(receiver1);
- checkContainsAndAcknowledge(msgs, receiver2, queue1);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkEmpty(receiver1);
- checkEmpty(receiver2);
- checkContainsAndAcknowledge(msgs, receiver3, queue1);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkEmpty(receiver1);
- checkEmpty(receiver2);
- checkEmpty(receiver3);
- checkContainsAndAcknowledge(msgs, receiver4, queue1);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkEmpty(receiver1);
- checkEmpty(receiver2);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkContainsAndAcknowledge(msgs, receiver5, queue1);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkContainsAndAcknowledge(msgs, receiver1, queue1);
- checkEmpty(receiver2);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkEmpty(receiver1);
- checkContainsAndAcknowledge(msgs, receiver2, queue1);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
-
- }
- finally
- {
- if (office1 != null)
- {
- office1.stop();
- }
-
- if (office2 != null)
- {
- office2.stop();
- }
-
- if (office3 != null)
- {
- office3.stop();
- }
-
- if (office4 != null)
- {
- office4.stop();
- }
-
- if (office5 != null)
- {
- office5.stop();
- }
-
- if (office6 != null)
- {
- office6.stop();
- }
- }
+ queue.add(receiver2);
+
+ dr.add(queue);
+
+ sendAndCheck(dr, receiver2);
+
+ sendAndCheck(dr, receiver2);
+
+ sendAndCheck(dr, receiver2);
}
+ // The router has multiple non local queues and one local queue with consumer
+ public void testRouterMultipleNonLocalOneLocalNoConsumer() throws Exception
+ {
+ DefaultRouter dr = new DefaultRouter();
+
+ ClusteredQueue remote1 = new SimpleQueue(false);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ remote1.add(receiver1);
+
+ dr.add(remote1);
+
+
+ ClusteredQueue remote2 = new SimpleQueue(false);
+
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ remote2.add(receiver2);
+
+ dr.add(remote2);
+
+
+ ClusteredQueue remote3 = new SimpleQueue(false);
+
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ remote3.add(receiver3);
+
+ dr.add(remote3);
+
+
+ ClusteredQueue queue = new SimpleQueue(true);
+
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ queue.add(receiver4);
+
+ dr.add(queue);
+
+
+ sendAndCheck(dr, receiver4);
+
+ sendAndCheck(dr, receiver4);
+
+ sendAndCheck(dr, receiver4);
+ }
- protected void local(boolean persistent) throws Throwable
+ // The router has multiple non local queues and one local queue without consumer
+ public void testRouterMultipleNonLocalOneLocalWithConsumer() throws Exception
{
- ClusteredPostOffice office1 = null;
+ DefaultRouter dr = new DefaultRouter();
+
+ ClusteredQueue remote1 = new SimpleQueue(false);
- ClusteredPostOffice office2 = null;
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- ClusteredPostOffice office3 = null;
+ remote1.add(receiver1);
- ClusteredPostOffice office4 = null;
+ dr.add(remote1);
- ClusteredPostOffice office5 = null;
- ClusteredPostOffice office6 = null;
-
- try
- {
- office1 = createClusteredPostOffice(1, "testgroup");
-
- office2 = createClusteredPostOffice(2, "testgroup");
-
- office3 = createClusteredPostOffice(3, "testgroup");
-
- office4 = createClusteredPostOffice(4, "testgroup");
-
- office5 = createClusteredPostOffice(5, "testgroup");
-
- office6 = createClusteredPostOffice(6, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding1 = office2.bindClusteredQueue("topic", queue1);
- SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue1.add(receiver1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding2 = office3.bindClusteredQueue("topic", queue2);
- SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue2.add(receiver2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding3 = office4.bindClusteredQueue("topic", queue3);
- SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue3.add(receiver3);
-
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding4 = office5.bindClusteredQueue("topic", queue4);
- SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue4.add(receiver4);
-
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding5 = office6.bindClusteredQueue("topic", queue5);
- SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue5.add(receiver5);
-
- List msgs = sendMessages("topic", persistent, office2, 3, null);
- checkContainsAndAcknowledge(msgs, receiver1, queue1);
- checkEmpty(receiver2);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office2, 3, null);
- checkContainsAndAcknowledge(msgs, receiver1, queue1);
- checkEmpty(receiver2);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office2, 3, null);
- checkContainsAndAcknowledge(msgs, receiver1, queue1);
- checkEmpty(receiver2);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
-
- msgs = sendMessages("topic", persistent, office3, 3, null);
- checkEmpty(receiver1);
- checkContainsAndAcknowledge(msgs, receiver2, queue1);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office3, 3, null);
- checkEmpty(receiver1);
- checkContainsAndAcknowledge(msgs, receiver2, queue1);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office3, 3, null);
- checkEmpty(receiver1);
- checkContainsAndAcknowledge(msgs, receiver2, queue1);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
-
- }
- finally
- {
- if (office1 != null)
- {
- office1.stop();
- }
-
- if (office2 != null)
- {
- office2.stop();
- }
-
- if (office3 != null)
- {
- office3.stop();
- }
-
- if (office4 != null)
- {
- office4.stop();
- }
-
- if (office5 != null)
- {
- office5.stop();
- }
-
- if (office6 != null)
- {
- office6.stop();
- }
- }
+ ClusteredQueue remote2 = new SimpleQueue(false);
+
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ remote2.add(receiver2);
+
+ dr.add(remote2);
+
+
+ ClusteredQueue remote3 = new SimpleQueue(false);
+
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ remote3.add(receiver3);
+
+ dr.add(remote3);
+
+
+ ClusteredQueue queue = new SimpleQueue(true);
+
+
+ dr.add(queue);
+
+
+ sendAndCheck(dr, receiver1);
+
+ sendAndCheck(dr, receiver2);
+
+ sendAndCheck(dr, receiver3);
+
+ sendAndCheck(dr, receiver1);
+
+ sendAndCheck(dr, receiver2);
+
+ sendAndCheck(dr, receiver3);
}
+ // The router has one local without consumer and one non local queue
+ public void testRouterMultipleOneLocalWithoutConsumerOneNonLocal() throws Exception
+ {
+ DefaultRouter dr = new DefaultRouter();
+
+ ClusteredQueue remote1 = new SimpleQueue(false);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ remote1.add(receiver1);
+
+ dr.add(remote1);
+
+ ClusteredQueue queue = new SimpleQueue(true);
+
+ dr.add(queue);
+
+ sendAndCheck(dr, receiver1);
+
+ sendAndCheck(dr, receiver1);
+
+ sendAndCheck(dr, receiver1);
+ }
+ private long nextId;
+ private void sendAndCheck(ClusterRouter router, SimpleReceiver receiver) throws Exception
+ {
+ Message msg = CoreMessageFactory.createCoreMessage(nextId++, false, null);
+
+ MessageReference ref = ms.reference(msg);
+
+ Delivery del = router.handle(null, ref, null);
+
+ assertNotNull(del);
+
+ assertTrue(del.isSelectorAccepted());
+
+ Thread.sleep(250);
+
+ List msgs = receiver.getMessages();
+
+ assertNotNull(msgs);
+
+ assertEquals(1, msgs.size());
+
+ Message msgRec = (Message)msgs.get(0);
+
+ assertTrue(msg == msgRec);
+
+ receiver.clear();
+ }
+
+
+
protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
{
MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
@@ -394,7 +397,218 @@
// Inner classes -------------------------------------------------
+ class SimpleQueue implements ClusteredQueue
+ {
+ private boolean local;
+
+ private Receiver receiver;
+
+ SimpleQueue(boolean local)
+ {
+ this.local = local;
+ }
+ public int getNodeId()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public QueueStats getStats()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public boolean isLocal()
+ {
+ return local;
+ }
+
+ public Filter getFilter()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public String getName()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public boolean isClustered()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public boolean acceptReliableMessages()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public void activate()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public List browse()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public List browse(Filter filter)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void clear()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void close()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void deactivate()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void deliver(boolean synchronous)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public List delivering(Filter filter)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public long getChannelID()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public boolean isActive()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public boolean isRecoverable()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public void load() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public int messageCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public void removeAllReferences() throws Throwable
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public List undelivered(Filter filter)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void unload() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+ {
+ if (receiver != null)
+ {
+ Delivery del = receiver.handle(observer, reference, tx);
+
+ return del;
+ }
+
+ return new SimpleDelivery(observer, reference);
+ }
+
+ public void acknowledge(Delivery d, Transaction tx) throws Throwable
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void cancel(Delivery d) throws Throwable
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public boolean add(Receiver receiver)
+ {
+ this.receiver = receiver;
+
+ return true;
+ }
+
+ public boolean contains(Receiver receiver)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public Iterator iterator()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public int numberOfReceivers()
+ {
+ if (receiver != null)
+ {
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ public boolean remove(Receiver receiver)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ }
+
+
}
Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -0,0 +1,313 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionException;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
+import org.jboss.test.messaging.util.CoreMessageFactory;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ *
+ * A RecoveryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class RecoveryTest extends ClusteringTestBase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public RecoveryTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testCrashBeforePersist() throws Exception
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 =
+ office1.bindClusteredQueue("topic1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 =
+ office2.bindClusteredQueue("topic1", queue2);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.add(receiver2);
+
+ //This will make it fail after casting but before persisting the message in the db
+ office1.setFail(true, false);
+
+ Transaction tx = tr.createTransaction();
+
+ final int NUM_MESSAGES = 10;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message msg = CoreMessageFactory.createCoreMessage(i);
+ msg.setReliable(true);
+
+ MessageReference ref = ms.reference(msg);
+
+ office1.route(ref, "topic1", tx);
+ }
+
+ Thread.sleep(1000);
+
+ List msgs = receiver1.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver2.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ try
+ {
+ //An exception should be thrown
+ tx.commit();
+ fail();
+ }
+ catch (TransactionException e)
+ {
+ //Ok
+ }
+
+ Thread.sleep(1000);
+
+ msgs = receiver1.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver2.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ //We now kill the office - this should make the other office do it's transaction check
+ office1.stop();
+
+ Thread.sleep(1000);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+
+ //The tx should be removed from the holding area and nothing should be received
+ msgs = receiver1.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver2.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+ }
+ }
+
+ public void testCrashAfterPersist() throws Exception
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 =
+ office1.bindClusteredQueue("topic1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 =
+ office2.bindClusteredQueue("topic1", queue2);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.add(receiver2);
+
+ //This will make it fail after casting and persisting the message in the db
+ office1.setFail(false, true);
+
+ Transaction tx = tr.createTransaction();
+
+ final int NUM_MESSAGES = 10;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message msg = CoreMessageFactory.createCoreMessage(i);
+ msg.setReliable(true);
+
+ MessageReference ref = ms.reference(msg);
+
+ office1.route(ref, "topic1", tx);
+ }
+
+ Thread.sleep(1000);
+
+ List msgs = receiver1.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver2.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ try
+ {
+ //An exception should be thrown
+ tx.commit();
+ fail();
+ }
+ catch (TransactionException e)
+ {
+ //Ok
+ }
+
+ Thread.sleep(1000);
+
+ msgs = receiver1.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver2.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ //We now kill the office - this should make the other office do it's transaction check
+ office1.stop();
+
+ Thread.sleep(1000);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+
+ //The tx should be removed from the holding area and messages be received
+ msgs = receiver1.getMessages();
+ assertEquals(NUM_MESSAGES, msgs.size());
+
+ msgs = receiver2.getMessages();
+ assertEquals(NUM_MESSAGES, msgs.size());
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+ }
+ }
+
+
+ protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
+ {
+ MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
+
+ FilterFactory ff = new SimpleFilterFactory();
+
+ ClusterRouterFactory rf = new DefaultRouterFactory();
+
+ DefaultClusteredPostOffice postOffice =
+ new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+ null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+ groupName,
+ JGroupsUtil.getControlStackProperties(),
+ JGroupsUtil.getDataStackProperties(),
+ 5000, 5000, redistPolicy, rf, 1, 1000);
+
+ postOffice.start();
+
+ return postOffice;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
+
+
+
+
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java 2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java 2006-10-09 20:39:19 UTC (rev 1457)
@@ -43,6 +43,7 @@
*
* A ManualClusteringTest
*
+ * Nodes must be started up in order node1, node2, node3
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
@@ -101,33 +102,33 @@
ic2 = new InitialContext(props2);
-// Properties props3 = new Properties();
-//
-// props3.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
-// props3.put(Context.PROVIDER_URL, "jnp://localhost:1399");
-// props3.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
-//
-// ic3 = new InitialContext(props3);
+ Properties props3 = new Properties();
+ props3.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+ props3.put(Context.PROVIDER_URL, "jnp://localhost:1399");
+ props3.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
+
+ ic3 = new InitialContext(props3);
+
queue1 = (Queue)ic1.lookup("queue/testDistributedQueue");
queue2 = (Queue)ic2.lookup("queue/testDistributedQueue");
- //queue3 = (Queue)ic3.lookup("queue/ClusteredQueue1");
+ queue3 = (Queue)ic3.lookup("queue/testDistributedQueue");
topic1 = (Topic)ic1.lookup("topic/testDistributedTopic");
topic2 = (Topic)ic2.lookup("topic/testDistributedTopic");
- //topic3 = (Topic)ic3.lookup("topic/ClusteredTopic1");
+ topic3 = (Topic)ic3.lookup("topic/testDistributedTopic");
cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
- //cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
-
- log.info("setup done");
+ cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
+
+ drainStuff();
}
protected void tearDown() throws Exception
@@ -139,37 +140,198 @@
ic2.close();
}
+ protected void drainStuff() throws Exception
+ {
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+
+ Connection conn3 = null;
+
+ try
+ {
+ conn1 = cf1.createConnection();
+
+ conn2 = cf2.createConnection();
+
+ conn3 = cf3.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue2);
+
+ MessageConsumer cons3 = sess3.createConsumer(queue2);
+
+ conn1.start();
+
+ conn2.start();
+
+ conn3.start();
+
+ Message msg = null;
+
+ do
+ {
+ msg = cons1.receive(1000);
+ }
+ while (msg != null);
+
+ do
+ {
+ msg = cons2.receive(1000);
+ }
+ while (msg != null);
+
+ do
+ {
+ msg = cons3.receive(1000);
+ }
+ while (msg != null);
+ }
+ finally
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+
+ if (conn3 != null) conn3.close();
+ }
+ }
+
+ public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
+ {
+ clusteredQueueLocalConsumer(false);
+ }
+
+ public void testClusteredQueueLocalConsumerPersistent() throws Exception
+ {
+ clusteredQueueLocalConsumer(true);
+ }
+
+ public void testClusteredQueueNoLocalConsumerNonPersistent() throws Exception
+ {
+ clusteredQueueNoLocalConsumer(false);
+ }
+
+ public void testClusteredQueueNoLocalConsumerPersistent() throws Exception
+ {
+ clusteredQueueNoLocalConsumer(true);
+ }
+
+
+ public void testClusteredTopicNonDurableNonPersistent() throws Exception
+ {
+ clusteredTopicNonDurable(false);
+ }
+
+ public void testClusteredTopicNonDurablePersistent() throws Exception
+ {
+ clusteredTopicNonDurable(true);
+ }
+
+
+ public void testClusteredTopicNonDurableWithSelectorsNonPersistent() throws Exception
+ {
+ clusteredTopicNonDurableWithSelectors(false);
+ }
+
+ public void testClusteredTopicNonDurableWithSelectorsPersistent() throws Exception
+ {
+ clusteredTopicNonDurableWithSelectors(true);
+ }
+
+ public void testClusteredTopicDurableNonPersistent() throws Exception
+ {
+ clusteredTopicDurable(false);
+ }
+
+ public void testClusteredTopicDurablePersistent() throws Exception
+ {
+ clusteredTopicDurable(true);
+ }
+
+ public void testClusteredTopicSharedDurableLocalConsumerNonPersistent() throws Exception
+ {
+ clusteredTopicSharedDurableLocalConsumer(false);
+ }
+
+ public void testClusteredTopicSharedDurableLocalConsumerPersistent() throws Exception
+ {
+ clusteredTopicSharedDurableLocalConsumer(true);
+ }
+
+ public void testClusteredTopicSharedDurableNoLocalConsumerNonPersistent() throws Exception
+ {
+ clusteredTopicSharedDurableNoLocalConsumer(false);
+ }
+
+ public void testClusteredTopicSharedDurableNoLocalConsumerPersistent() throws Exception
+ {
+ clusteredTopicSharedDurableNoLocalConsumer(true);
+ }
+
+ public void testClusteredTopicSharedDurableNoLocalSubNonPersistent() throws Exception
+ {
+ clusteredTopicSharedDurableNoLocalSub(false);
+ }
+
+ public void testClusteredTopicSharedDurableNoLocalSubPersistent() throws Exception
+ {
+ clusteredTopicSharedDurableNoLocalSub(true);
+ }
+
+
+
+
/*
- * Each node had consumers, send message at node, make sure local consumer gets message
+ * Create a consumer on each queue on each node.
+ * Send messages in turn from all nodes.
+ * Ensure that the local consumer gets the message
*/
- public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
+ protected void clusteredQueueLocalConsumer(boolean persistent) throws Exception
{
- log.info("starting test");
-
Connection conn1 = null;
Connection conn2 = null;
+
+ Connection conn3 = null;
try
{
conn1 = cf1.createConnection();
conn2 = cf2.createConnection();
+
+ conn3 = cf3.createConnection();
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
MessageConsumer cons1 = sess1.createConsumer(queue1);
MessageConsumer cons2 = sess2.createConsumer(queue2);
+ MessageConsumer cons3 = sess3.createConsumer(queue3);
+
conn1.start();
conn2.start();
+ conn3.start();
+
+ //Send at node1
+
MessageProducer prod1 = sess1.createProducer(queue1);
- prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
final int NUM_MESSAGES = 100;
@@ -180,66 +342,138 @@
prod1.send(tm);
}
- log.info("sent messages");
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
- log.info("i is " + i);
-
TextMessage tm = (TextMessage)cons1.receive(1000);
assertNotNull(tm);
- log.info("Got message:" + tm);
-
assertEquals("message" + i, tm.getText());
}
Message m = cons2.receive(2000);
assertNull(m);
- }
- finally
- {
- try
+
+ m = cons3.receive(2000);
+
+ assertNull(m);
+
+ // Send at node2
+
+ MessageProducer prod2 = sess2.createProducer(queue2);
+
+ prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
- if (conn1 != null) conn1.close();
+ TextMessage tm = sess2.createTextMessage("message" + i);
- if (conn2 != null) conn2.close();
+ prod2.send(tm);
}
- catch (Exception ignore)
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
}
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ m = cons3.receive(2000);
+
+ assertNull(m);
+
+ // Send at node3
+
+ MessageProducer prod3 = sess3.createProducer(queue3);
+
+ prod3.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess3.createTextMessage("message" + i);
+
+ prod3.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
}
+ finally
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+
+ if (conn3 != null) conn3.close();
+ }
}
- public void testClusteredQueueLocalConsumerPersistent() throws Exception
+
+
+
+ /*
+ * Create a consumer on two nodes out of three
+ * Send messages from the third node
+ * Ensure that the messages are received from the other two nodes in
+ * round robin order.
+ * (Note that this test depends on us using the default router which has
+ * this round robin behaviour)
+ */
+ protected void clusteredQueueNoLocalConsumer(boolean persistent) throws Exception
{
Connection conn1 = null;
Connection conn2 = null;
+
+ Connection conn3 = null;
try
{
conn1 = cf1.createConnection();
conn2 = cf2.createConnection();
+
+ conn3 = cf3.createConnection();
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons1 = sess1.createConsumer(queue1);
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons2 = sess2.createConsumer(queue2);
- conn1.start();
+ MessageConsumer cons3 = sess3.createConsumer(queue3);
conn2.start();
+ conn3.start();
+
+ //Send at node1
+
MessageProducer prod1 = sess1.createProducer(queue1);
- prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
final int NUM_MESSAGES = 100;
@@ -250,202 +484,83 @@
prod1.send(tm);
}
- log.info("sent messages");
-
- for (int i = 0; i < NUM_MESSAGES; i++)
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
- log.info("i is " + i);
+ TextMessage tm = (TextMessage)cons2.receive(1000);
- TextMessage tm = (TextMessage)cons1.receive(1000);
-
assertNotNull(tm);
- log.info("Got message:" + tm);
-
- assertEquals("message" + i, tm.getText());
+ assertEquals("message" + i * 2, tm.getText());
}
- Message m = cons2.receive(2000);
-
- assertNull(m);
- }
- finally
- {
- try
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
- if (conn1 != null) conn1.close();
+ TextMessage tm = (TextMessage)cons3.receive(1000);
- if (conn2 != null) conn2.close();
- }
- catch (Exception ignore)
- {
+ assertNotNull(tm);
+ assertEquals("message" + (i * 2 + 1), tm.getText());
}
+
}
+ finally
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+
+ if (conn3 != null) conn3.close();
+ }
}
-// /*
-// * No consumer on local node, send message at node, make sure remote consumer gets messages
-// */
-// public void testClusteredQueueNoLocalConsumerNonPersistent() throws Exception
-// {
-// Connection conn1 = null;
-//
-// Connection conn2 = null;
-// try
-// {
-// conn1 = cf1.createConnection();
-//
-// conn2 = cf2.createConnection();
-//
-// Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageConsumer cons2 = sess2.createConsumer(queue2);
-//
-// conn1.start();
-//
-// conn2.start();
-//
-// MessageProducer prod1 = sess1.createProducer(queue1);
-//
-// prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-//
-// final int NUM_MESSAGES = 100;
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = sess1.createTextMessage("message" + i);
-//
-// prod1.send(tm);
-// }
-//
-// log.info("sent messages");
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// log.info("i is " + i);
-//
-// TextMessage tm = (TextMessage)cons2.receive(10000);
-//
-// assertNotNull(tm);
-//
-// log.info("Got message:" + tm);
-//
-// assertEquals("message" + i, tm.getText());
-// }
-//
-// }
-// finally
-// {
-// try
-// {
-// if (conn1 != null) conn1.close();
-//
-// if (conn2 != null) conn2.close();
-// }
-// catch (Exception ignore)
-// {
-//
-// }
-// }
-// }
-//
-//
-//
-// public void testClusteredQueueNoLocalConsumerPersistent() throws Exception
-// {
-// Connection conn1 = null;
-//
-// Connection conn2 = null;
-// try
-// {
-// conn1 = cf1.createConnection();
-//
-// conn2 = cf2.createConnection();
-//
-// Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageConsumer cons2 = sess2.createConsumer(queue2);
-//
-// conn1.start();
-//
-// conn2.start();
-//
-// MessageProducer prod1 = sess1.createProducer(queue1);
-//
-// prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
-//
-// final int NUM_MESSAGES = 100;
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = sess1.createTextMessage("message" + i);
-//
-// prod1.send(tm);
-// }
-//
-// log.info("sent messages");
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// log.info("i is " + i);
-//
-// TextMessage tm = (TextMessage)cons2.receive(1000);
-//
-// assertNotNull(tm);
-//
-// log.info("Got message:" + tm);
-//
-// assertEquals("message" + i, tm.getText());
-// }
-//
-// }
-// finally
-// {
-// try
-// {
-// if (conn1 != null) conn1.close();
-//
-// if (conn2 != null) conn2.close();
-// }
-// catch (Exception ignore)
-// {
-//
-// }
-// }
-// }
-//
-
- public void testClusteredTopicNonDurableNonPersistent() throws Exception
+
+
+ /*
+ * Create non durable subscriptions on all nodes of the cluster.
+ * Ensure all messages are receive as appropriate
+ */
+ public void clusteredTopicNonDurable(boolean persistent) throws Exception
{
Connection conn1 = null;
Connection conn2 = null;
+
+ Connection conn3 = null;
try
{
conn1 = cf1.createConnection();
conn2 = cf2.createConnection();
+
+ conn3 = cf3.createConnection();
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
MessageConsumer cons1 = sess1.createConsumer(topic1);
MessageConsumer cons2 = sess2.createConsumer(topic2);
+ MessageConsumer cons3 = sess3.createConsumer(topic3);
+
+ MessageConsumer cons4 = sess1.createConsumer(topic1);
+
+ MessageConsumer cons5 = sess2.createConsumer(topic2);
+
conn1.start();
conn2.start();
+ conn3.start();
+
+ //Send at node1
+
MessageProducer prod1 = sess1.createProducer(topic1);
- prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
final int NUM_MESSAGES = 100;
@@ -455,79 +570,112 @@
prod1.send(tm);
}
-
- log.info("sent messages");
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
- log.info("i is " + i);
-
TextMessage tm = (TextMessage)cons1.receive(1000);
assertNotNull(tm);
-
- log.info("Got message:" + tm);
-
- assertEquals("message" + i, tm.getText());
+
+ assertEquals("message" + i, tm.getText());
}
for (int i = 0; i < NUM_MESSAGES; i++)
{
- log.info("i is " + i);
-
TextMessage tm = (TextMessage)cons2.receive(1000);
-
+
assertNotNull(tm);
- log.info("Got message:" + tm);
-
assertEquals("message" + i, tm.getText());
}
-
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons4.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons5.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
}
finally
{
- try
- {
- if (conn1 != null) conn1.close();
-
- if (conn2 != null) conn2.close();
- }
- catch (Exception ignore)
- {
-
- }
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+
+ if (conn3 != null) conn3.close();
}
}
- public void testClusteredTopicNonDurablePersistent() throws Exception
+
+
+ /*
+ * Create non durable subscriptions on all nodes of the cluster.
+ * Include some with selectors
+ * Ensure all messages are receive as appropriate
+ */
+ public void clusteredTopicNonDurableWithSelectors(boolean persistent) throws Exception
{
Connection conn1 = null;
Connection conn2 = null;
+
+ Connection conn3 = null;
try
{
conn1 = cf1.createConnection();
conn2 = cf2.createConnection();
-
+
+ conn3 = cf3.createConnection();
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
MessageConsumer cons1 = sess1.createConsumer(topic1);
MessageConsumer cons2 = sess2.createConsumer(topic2);
+ MessageConsumer cons3 = sess3.createConsumer(topic3);
+
+ MessageConsumer cons4 = sess1.createConsumer(topic1, "COLOUR='red'");
+
+ MessageConsumer cons5 = sess2.createConsumer(topic2, "COLOUR='blue'");
+
conn1.start();
conn2.start();
+ conn3.start();
+
+ //Send at node1
+
MessageProducer prod1 = sess1.createProducer(topic1);
- prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
final int NUM_MESSAGES = 100;
@@ -535,85 +683,140 @@
{
TextMessage tm = sess1.createTextMessage("message" + i);
+ int c = i % 3;
+ if (c == 0)
+ {
+ tm.setStringProperty("COLOUR", "red");
+ }
+ else if (c == 1)
+ {
+ tm.setStringProperty("COLOUR", "blue");
+ }
+
prod1.send(tm);
}
-
- log.info("sent messages");
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
- log.info("i is " + i);
-
TextMessage tm = (TextMessage)cons1.receive(1000);
assertNotNull(tm);
-
- log.info("Got message:" + tm);
-
- assertEquals("message" + i, tm.getText());
+
+ assertEquals("message" + i, tm.getText());
}
for (int i = 0; i < NUM_MESSAGES; i++)
{
- log.info("i is " + i);
-
TextMessage tm = (TextMessage)cons2.receive(1000);
-
+
assertNotNull(tm);
- log.info("Got message:" + tm);
-
assertEquals("message" + i, tm.getText());
}
-
- }
- finally
- {
- try
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
- if (conn1 != null) conn1.close();
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ int c = i % 3;
- if (conn2 != null) conn2.close();
- }
- catch (Exception ignore)
+ if (c == 0)
+ {
+ TextMessage tm = (TextMessage)cons4.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
+ int c = i % 3;
- }
+ if (c == 1)
+ {
+ TextMessage tm = (TextMessage)cons5.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+ }
}
+ finally
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+
+ if (conn3 != null) conn3.close();
+ }
}
- public void testClusteredTopicDurableNonPersistentLocal() throws Exception
+
+ /*
+ * Create durable subscriptions on all nodes of the cluster.
+ * Include a couple with selectors
+ * Ensure all messages are receive as appropriate
+ * None of the durable subs are shared
+ */
+ public void clusteredTopicDurable(boolean persistent) throws Exception
{
Connection conn1 = null;
Connection conn2 = null;
+
+ Connection conn3 = null;
try
{
conn1 = cf1.createConnection();
- conn1.setClientID("id1");
-
conn2 = cf2.createConnection();
- conn2.setClientID("id1");
+ conn3 = cf3.createConnection();
+
+ conn1.setClientID("wib1");
+
+ conn2.setClientID("wib1");
+
+ conn3.setClientID("wib1");
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer durable1 = sess1.createDurableSubscriber(topic1, "sub1");
- MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
+
+ MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub2");
+
+ MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub3");
+
+ MessageConsumer cons4 = sess1.createDurableSubscriber(topic1, "sub4");
+
+ MessageConsumer cons5 = sess2.createDurableSubscriber(topic2, "sub5");
+
conn1.start();
conn2.start();
+ conn3.start();
+
+ //Send at node1
+
MessageProducer prod1 = sess1.createProducer(topic1);
- prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
final int NUM_MESSAGES = 100;
@@ -623,92 +826,133 @@
prod1.send(tm);
}
-
- log.info("sent messages");
-
- //All the messages should be on the local sub
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
- log.info("i is " + i);
+ TextMessage tm = (TextMessage)cons1.receive(1000);
- TextMessage tm = (TextMessage)durable1.receive(1000);
-
assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
- log.info("Got message:" + tm);
-
assertEquals("message" + i, tm.getText());
}
- Message m = durable2.receive(2000);
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
- assertNull(m);
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons4.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
- durable1.close();
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons5.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
- durable2.close();
+ cons1.close();
+ cons2.close();
+
+ cons3.close();
+
+ cons4.close();
+
+ cons5.close();
+
sess1.unsubscribe("sub1");
- sess2.unsubscribe("sub1");
-
+ sess2.unsubscribe("sub2");
+
+ sess3.unsubscribe("sub3");
+
+ sess1.unsubscribe("sub4");
+
+ sess2.unsubscribe("sub5");
+
}
finally
{
- try
- {
- if (conn1 != null) conn1.close();
-
- if (conn2 != null) conn2.close();
- }
- catch (Exception ignore)
- {
-
- }
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+
+ if (conn3 != null) conn3.close();
}
}
- public void testClusteredTopicDurablePersistentLocal() throws Exception
+
+
+
+ /*
+ * Create shared durable subs on multiple nodes, the local instance should always get the message
+ */
+ protected void clusteredTopicSharedDurableLocalConsumer(boolean persistent) throws Exception
{
Connection conn1 = null;
Connection conn2 = null;
+
+ Connection conn3 = null;
try
{
conn1 = cf1.createConnection();
- conn1.setClientID("id1");
-
conn2 = cf2.createConnection();
- conn2.setClientID("id1");
+ conn3 = cf3.createConnection();
+
+ conn1.setClientID("wib1");
+
+ conn2.setClientID("wib1");
+
+ conn3.setClientID("wib1");
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try
- {
- sess1.unsubscribe("sub1");
-
- sess2.unsubscribe("sub1");
- }
- catch (Exception ignore)
- {
- }
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer durable1 = sess1.createDurableSubscriber(topic1, "sub1");
+ MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
- MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
+ MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
+ MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
+
conn1.start();
conn2.start();
+ conn3.start();
+
+ //Send at node1
+
MessageProducer prod1 = sess1.createProducer(topic1);
- prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
final int NUM_MESSAGES = 100;
@@ -719,76 +963,169 @@
prod1.send(tm);
}
- log.info("sent messages");
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
- //All the messages should be on the local sub
+ Message m = cons2.receive(2000);
+ assertNull(m);
+
+ m = cons3.receive(2000);
+
+ assertNull(m);
+
+ // Send at node2
+
+ MessageProducer prod2 = sess2.createProducer(topic2);
+
+ prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
- log.info("i is " + i);
+ TextMessage tm = sess2.createTextMessage("message" + i);
- TextMessage tm = (TextMessage)durable1.receive(1000);
+ prod2.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ m = cons3.receive(2000);
+
+ assertNull(m);
+
+ // Send at node3
+
+ MessageProducer prod3 = sess3.createProducer(topic3);
+
+ prod3.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess3.createTextMessage("message" + i);
- log.info("Got message:" + tm);
+ prod3.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+ assertNotNull(tm);
+
assertEquals("message" + i, tm.getText());
}
- Message m = durable2.receive(2000);
+ m = cons1.receive(2000);
assertNull(m);
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ cons1.close();
+
+ cons2.close();
+
+ cons3.close();
+
+ //Need to unsubscribe on any node that the durable sub was created on
+
sess1.unsubscribe("sub1");
sess2.unsubscribe("sub1");
+ sess3.unsubscribe("sub1");
}
finally
{
- try
- {
- if (conn1 != null) conn1.close();
-
- if (conn2 != null) conn2.close();
- }
- catch (Exception ignore)
- {
-
- }
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+
+ if (conn3 != null) conn3.close();
}
}
-
- public void testClusteredTopicDurableNonPersistentNotLocal() throws Exception
+
+ /*
+ * Create shared durable subs on multiple nodes, but without consumer on local node
+ * even thought there is durable sub
+ * should round robin
+ * note that this test assumes round robin
+ */
+ protected void clusteredTopicSharedDurableNoLocalConsumer(boolean persistent) throws Exception
{
Connection conn1 = null;
Connection conn2 = null;
+
+ Connection conn3 = null;
try
{
conn1 = cf1.createConnection();
- conn1.setClientID("id1");
-
conn2 = cf2.createConnection();
- conn2.setClientID("id1");
+ conn3 = cf3.createConnection();
+
+ conn1.setClientID("wib1");
+
+ conn2.setClientID("wib1");
+
+ conn3.setClientID("wib1");
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- conn1.start();
+ MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
+ //Now close it on node 1
+ conn1.close();
+
+ conn1 = cf1.createConnection();
+
+ conn1.setClientID("wib1");
+
+ sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //This means the durable sub is inactive on node1
+
+ MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
+
+ MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
+
conn2.start();
+ conn3.start();
+
+ //Send at node1
+
+ //Should round robin between the other 2 since there is no active consumer on sub1 on node1
+
MessageProducer prod1 = sess1.createProducer(topic1);
- prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
final int NUM_MESSAGES = 100;
@@ -799,71 +1136,92 @@
prod1.send(tm);
}
- log.info("sent messages");
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i * 2, tm.getText());
+ }
- //All the messages should be on the non local sub
-
- for (int i = 0; i < NUM_MESSAGES; i++)
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
- log.info("i is " + i);
+ TextMessage tm = (TextMessage)cons3.receive(1000);
- TextMessage tm = (TextMessage)durable2.receive(1000);
-
assertNotNull(tm);
- log.info("Got message:" + tm);
-
- assertEquals("message" + i, tm.getText());
+ assertEquals("message" + (i * 2 + 1), tm.getText());
}
- durable2.close();
-
+ cons2.close();
+
+ cons3.close();
+
+ sess1.unsubscribe("sub1");
+
sess2.unsubscribe("sub1");
+ sess3.unsubscribe("sub1");
+
}
finally
{
- try
- {
- if (conn1 != null) conn1.close();
-
- if (conn2 != null) conn2.close();
- }
- catch (Exception ignore)
- {
-
- }
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+
+ if (conn3 != null) conn3.close();
}
}
- public void testClusteredTopicDurablePersistentNotLocal() throws Exception
+
+
+ /*
+ * Create shared durable subs on multiple nodes, but without sub on local node
+ * should round robin
+ * note that this test assumes round robin
+ */
+ protected void clusteredTopicSharedDurableNoLocalSub(boolean persistent) throws Exception
{
Connection conn1 = null;
Connection conn2 = null;
+
+ Connection conn3 = null;
try
{
conn1 = cf1.createConnection();
- conn1.setClientID("id1");
-
conn2 = cf2.createConnection();
- conn2.setClientID("id1");
+ conn3 = cf3.createConnection();
+
+ conn2.setClientID("wib1");
+
+ conn3.setClientID("wib1");
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
- conn1.start();
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
+ MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
+
conn2.start();
+ conn3.start();
+
+ //Send at node1
+
+ //Should round robin between the other 2 since there is no active consumer on sub1 on node1
+
MessageProducer prod1 = sess1.createProducer(topic1);
- prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
final int NUM_MESSAGES = 100;
@@ -874,44 +1232,43 @@
prod1.send(tm);
}
- log.info("sent messages");
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i * 2, tm.getText());
+ }
- //All the messages should be on the non local sub
-
- for (int i = 0; i < NUM_MESSAGES; i++)
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
- log.info("i is " + i);
+ TextMessage tm = (TextMessage)cons3.receive(1000);
- TextMessage tm = (TextMessage)durable2.receive(1000);
-
assertNotNull(tm);
- log.info("Got message:" + tm);
-
- assertEquals("message" + i, tm.getText());
+ assertEquals("message" + (i * 2 + 1), tm.getText());
}
- durable2.close();
+ cons2.close();
+ cons3.close();
+
sess2.unsubscribe("sub1");
+ sess3.unsubscribe("sub1");
+
}
finally
{
- try
- {
- if (conn1 != null) conn1.close();
-
- if (conn2 != null) conn2.close();
- }
- catch (Exception ignore)
- {
-
- }
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+
+ if (conn3 != null) conn3.close();
}
}
-
class MyListener implements MessageListener
{
private int i;
More information about the jboss-cvs-commits
mailing list