[jboss-cvs] JBoss Messaging SVN: r1434 - in trunk: src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/local/base tests/src/org/jboss/test/messaging/core/paging tests/src/org/jboss/test/messaging/core/paging/base tests/src/org/jboss/test/messaging/core/plugin tests/src/org/jboss/test/messaging/core/plugin/base tests/src/org/jboss/test/messaging/core/plugin/postoffice tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 5 12:10:45 EDT 2006
Author: timfox
Date: 2006-10-05 12:10:31 -0400 (Thu, 05 Oct 2006)
New Revision: 1434
Modified:
trunk/src/main/org/jboss/messaging/core/plugin/IdManager.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
trunk/tests/src/org/jboss/test/messaging/core/SimpleDeliveryTest.java
trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/IdManagerTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Log:
A few tweaks and fixes
Modified: trunk/src/main/org/jboss/messaging/core/plugin/IdManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/IdManager.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/src/main/org/jboss/messaging/core/plugin/IdManager.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -50,6 +50,8 @@
protected String counterName;
+ private boolean started;
+
public IdManager(String counterName, int bigBlockSize, PersistenceManager pm) throws Exception
{
this.bigBlockSize = bigBlockSize;
@@ -59,18 +61,25 @@
this.counterName = counterName;
}
- public void start() throws Exception
+ public synchronized void start() throws Exception
{
getNextBigBlock();
+
+ started = true;
}
- public void stop() throws Exception
+ public synchronized void stop() throws Exception
{
- //NOOP
+ started = false;
}
public synchronized IdBlock getIdBlock(int size) throws Exception
{
+ if (!started)
+ {
+ throw new IllegalStateException(this + " is not started");
+ }
+
if (size <= 0)
{
throw new IllegalArgumentException("block size must be > 0");
@@ -104,7 +113,7 @@
{
nextBlock = pm.reserveIDBlock(counterName, bigBlockSize);
- if (trace) { log.trace("Retrieved nex block of size " + bigBlockSize + " from pm starting at " + nextBlock); }
+ if (trace) { log.trace("Retrieved next block of size " + bigBlockSize + " from pm starting at " + nextBlock); }
high = nextBlock + bigBlockSize - 1;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -941,7 +941,7 @@
if (nodeId == this.nodeId)
{
//Sanity check
- throw new IllegalStateException("Cannot update queue stats for current node");
+ throw new IllegalStateException("Received stats from node with id that matches this nodes id. You may have started two or more nodes with the same node id!");
}
Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
@@ -963,37 +963,40 @@
if (bb == null)
{
- throw new IllegalStateException("Cannot find binding for queue name: " + st.getQueueName());
+ //I guess this is possible if the queue was unbound
+ if (trace) { log.trace(this.nodeId + " cannot find binding for queue " + st.getQueueName() + " it could have been unbound"); }
}
-
- RemoteQueueStub stub = (RemoteQueueStub)bb.getQueue();
-
- stub.setStats(st);
-
- if (trace) { log.trace(this.nodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
-
- ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
-
- //Maybe the local queue now wants to pull message(s) from the remote queue given that the
- //stats for the remote queue have changed
- LocalClusteredQueue localQueue = router.getLocalQueue();
-
- if (localQueue != null)
- {
- RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
+ else
+ {
+ RemoteQueueStub stub = (RemoteQueueStub)bb.getQueue();
- if (trace) { log.trace(this.nodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
+ stub.setStats(st);
- if (toQueue != null)
- {
- localQueue.setPullInfo(toQueue, pullSize);
+ if (trace) { log.trace(this.nodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
+
+ ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
+
+ //Maybe the local queue now wants to pull message(s) from the remote queue given that the
+ //stats for the remote queue have changed
+ LocalClusteredQueue localQueue = router.getLocalQueue();
+
+ if (localQueue != null)
+ {
+ RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
- //We now trigger delivery - this may cause a pull event
- localQueue.deliver(false);
-
- if (trace) { log.trace(this.nodeId + " triggered delivery for " + localQueue.getName()); }
- }
- }
+ if (trace) { log.trace(this.nodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
+
+ if (toQueue != null)
+ {
+ localQueue.setPullInfo(toQueue, pullSize);
+
+ //We now trigger delivery - this may cause a pull event
+ localQueue.deliver(false);
+
+ if (trace) { log.trace(this.nodeId + " triggered delivery for " + localQueue.getName()); }
+ }
+ }
+ }
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -333,6 +333,8 @@
Iterator iter = msgs.iterator();
+ boolean containsReliable = false;
+
while (iter.hasNext())
{
org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
@@ -341,6 +343,8 @@
{
//It will already have been persisted on the other node
msg.setPersisted(true);
+
+ containsReliable = true;
}
MessageReference ref = null;
@@ -370,7 +374,7 @@
del.acknowledge(tx);
}
-
+
tx.commit();
//TODO what if commit throws an exception - this means the commit message doesn't hit the
@@ -380,7 +384,9 @@
//and send a checkrequest
//This applies to a normal message and messages requests too
- if (!msgs.isEmpty())
+ //We only need to send a commit message if there were reliable messages since otherwise
+ //the transaction wouldn't have been added in the holding area
+ if (containsReliable && isRecoverable())
{
req = new PullMessagesRequest(this.nodeId, tx.getId());
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -43,6 +43,8 @@
public class PullMessagesRequest extends TransactionRequest implements ClusterTransaction
{
private static final Logger log = Logger.getLogger(PullMessagesRequest.class);
+
+ private boolean trace = log.isTraceEnabled();
private String queueName;
@@ -74,10 +76,14 @@
{
TransactionId id = new TransactionId(nodeId, txId);
+ if (trace) { log.trace("Executing PullMessagesRequest with id: " + id + " hold: " + hold); }
+
if (hold)
- {
+ {
List dels = office.getDeliveries(queueName, numMessages);
+ if (trace) { log.trace("PullMessagesRequest got " + dels.size() + " deliveries"); }
+
PullMessagesResponse response = new PullMessagesResponse(dels.size());
if (!dels.isEmpty())
@@ -91,10 +97,10 @@
//Add it to internal list
if (reliableDels == null)
{
- reliableDels = new ArrayList();
-
- reliableDels.add(del);
+ reliableDels = new ArrayList();
}
+
+ reliableDels.add(del);
}
else
{
@@ -104,7 +110,7 @@
response.addMessage(del.getReference().getMessage());
}
-
+
if (reliableDels != null)
{
//Add this to the holding area
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -47,6 +47,8 @@
private Timer timer;
private long period;
+
+ private SendStatsTimerTask task;
StatsSender(PostOfficeInternal office, long period)
{
@@ -68,9 +70,11 @@
//Add a random delay to prevent all timers starting at once
long delay = (long)(period * Math.random());
- TimerTask task = new SendStatsTimerTask();
+ task = new SendStatsTimerTask();
+
+ timer.schedule(task, delay, period);
- timer.schedule(task, delay, period);
+ started = true;
}
public synchronized void stop() throws Exception
@@ -79,16 +83,29 @@
{
return;
}
+
+ //Wait for timer task to stop
+ task.stop();
+
timer.cancel();
timer = null;
+
+ started = false;
}
class SendStatsTimerTask extends TimerTask
{
+ private boolean stopping;
+ private boolean stopped;
+
+ private Object stopLock = new Object();
+
public void run()
{
+ checkStop();
+
try
{
office.sendQueueStats();
@@ -97,6 +114,42 @@
{
log.error("Failed to send statistics", e);
}
- }
+
+ checkStop();
+ }
+
+ private void checkStop()
+ {
+ synchronized (stopLock)
+ {
+ if (stopping)
+ {
+ cancel();
+ stopped = true;
+ stopLock.notify();
+ return;
+ }
+ }
+ }
+
+ void stop()
+ {
+ synchronized (stopLock)
+ {
+ stopping = true;
+
+ while (!stopped)
+ {
+ try
+ {
+ stopLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ //Ignore
+ }
+ }
+ }
+ }
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleDeliveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleDeliveryTest.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleDeliveryTest.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -116,7 +116,10 @@
true, true, true, 100);
pm.start();
- TransactionRepository tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+ IdManager idm = new IdManager("TRANSACTION_ID", 10, pm);
+ idm.start();
+
+ TransactionRepository tr = new TransactionRepository(pm, idm);
tr.start();
Transaction tx = tr.createTransaction();
Modified: trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -96,6 +96,8 @@
protected PagingFilteredQueue queue;
+ protected IdManager idm;
+
// Constructors --------------------------------------------------
public PagingFilteredQueueTestBase(String name)
@@ -117,7 +119,10 @@
true, true, true, 100);
pm.start();
- tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+ idm = new IdManager("TRANSACTION_ID", 10, pm);
+ idm.start();
+
+ tr = new TransactionRepository(pm, idm);
tr.start();
ms = new SimpleMessageStore();
@@ -127,8 +132,15 @@
public void tearDown() throws Exception
{
sc.stop();
- sc = null;
+ pm.stop();
+ idm.stop();
+ tr.stop();
+ ms.stop();
+
+ sc = null;
+ pm = null;
+ idm = null;
ms = null;
tr = null;
super.tearDown();
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -120,7 +120,7 @@
true, true, true, 100);
pm.start();
- tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+ tr = new TransactionRepository(pm, idm);
tr.start();
ms = new SimpleMessageStore();
@@ -217,7 +217,7 @@
true, true, true, 100);
pm.start();
- tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+ tr = new TransactionRepository(pm, idm);
tr.start();
ms = new SimpleMessageStore();
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -71,6 +71,7 @@
protected PersistenceManager pm;
protected SimpleMessageStore ms;
protected TransactionRepository tr;
+ protected IdManager idm;
// Constructors --------------------------------------------------
@@ -98,7 +99,10 @@
true, true, true, 100);
pm.start();
- tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+ idm = new IdManager("TRANSACTION_ID", 10, pm);
+ idm.start();
+
+ tr = new TransactionRepository(pm, idm);
tr.start();
ms = new SimpleMessageStore();
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/IdManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/IdManagerTest.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/IdManagerTest.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -90,6 +90,7 @@
public void test1() throws Exception
{
IdManager idm = new IdManager("test_counter", 1000, pm);
+ idm.start();
int blockSize = 37;
@@ -105,7 +106,24 @@
nextLow = block.getHigh() + 1;
}
+
+ idm.stop();
}
+
+ public void test2() throws Exception
+ {
+ IdManager idm = new IdManager("test_counter2", 100, pm);
+ idm.start();
+
+ for (int i = 0; i < 1000; i++)
+ {
+ long id = idm.getId();
+
+ assertEquals(i, id);
+ }
+
+ idm.stop();
+ }
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -1161,7 +1161,11 @@
doSetup(batch, 100);
Channel channel = new SimpleChannel(0, ms);
- TransactionRepository txRep = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+
+ IdManager idm = new IdManager("TRANSACTION_ID", 10, pm);
+ idm.start();
+
+ TransactionRepository txRep = new TransactionRepository(pm, idm);
txRep.start();
log.debug("transaction log started");
@@ -1268,7 +1272,11 @@
doSetup(batch, 100);
Channel channel = new SimpleChannel(0, ms);
- TransactionRepository txRep = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+
+ IdManager idm = new IdManager("TRANSACTION_ID", 10, pm);
+ idm.start();
+
+ TransactionRepository txRep = new TransactionRepository(pm, idm);
txRep.start();
Message[] messages = createMessages(10);
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -73,8 +73,10 @@
protected ServiceContainer sc;
- protected IdManager im;
+ protected IdManager channelIdManager;
+ protected IdManager transactionIdManager;
+
protected PersistenceManager pm;
protected MessageStore ms;
@@ -102,10 +104,13 @@
pm =
new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(), null,
- true, true, true, 100);
+ true, false, true, 100);
pm.start();
- tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+ transactionIdManager = new IdManager("TRANSACTION_ID", 10, pm);
+ transactionIdManager.start();
+
+ tr = new TransactionRepository(pm, transactionIdManager);
tr.start();
ms = new SimpleMessageStore();
@@ -113,7 +118,8 @@
pool = new QueuedExecutorPool(10);
- im = new IdManager("CHANNEL_ID", 10, pm);
+ channelIdManager = new IdManager("CHANNEL_ID", 10, pm);
+ channelIdManager.start();
log.debug("setup done");
}
@@ -128,6 +134,8 @@
pm.stop();
tr.stop();
ms.stop();
+ transactionIdManager.stop();
+ channelIdManager.stop();
super.tearDown();
}
@@ -269,7 +277,7 @@
list.add(msg);
}
- Thread.sleep(2000);
+ Thread.sleep(1000);
return list;
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -120,7 +120,7 @@
Filter filter1 = new Selector("x = 'cheese'");
Filter filter2 = new Selector("y = 'bread'");
- PagingFilteredQueue queue1 = new PagingFilteredQueue("durableQueue", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue1 = new PagingFilteredQueue("durableQueue", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
Binding binding1 =
@@ -138,7 +138,7 @@
}
//Bind one non durable
- PagingFilteredQueue queue2 = new PagingFilteredQueue("nonDurableQueue", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue2 = new PagingFilteredQueue("nonDurableQueue", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding2 =
office1.bindQueue("condition2", queue2);
@@ -225,42 +225,42 @@
{
office = createPostOffice();
- PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding1 =
office.bindQueue("condition1", queue1);
- PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding2 =
office.bindQueue("condition1", queue2);
- PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding3 =
office.bindQueue("condition1", queue3);
- PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding4 =
office.bindQueue("condition1", queue4);
- PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding5 =
office.bindQueue("condition2", queue5);
- PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding6 =
office.bindQueue("condition2", queue6);
- PagingFilteredQueue queue7 = new PagingFilteredQueue("queue7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue7 = new PagingFilteredQueue("queue7", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding7 =
office.bindQueue("condition2", queue7);
- PagingFilteredQueue queue8 = new PagingFilteredQueue("queue8", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue8 = new PagingFilteredQueue("queue8", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding8 =
office.bindQueue("condition2", queue8);
@@ -354,32 +354,32 @@
postOffice = createPostOffice();
- PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding1 =
postOffice.bindQueue("topic1", queue1);
- PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding2 =
postOffice.bindQueue("topic1", queue2);
- PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding3 =
postOffice.bindQueue("topic1", queue3);
- PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding4 =
postOffice.bindQueue("topic2", queue4);
- PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding5 =
postOffice.bindQueue("topic2", queue5);
- PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding6 =
postOffice.bindQueue("topic2", queue6);
@@ -504,7 +504,7 @@
{
postOffice = createPostOffice();
- PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding1 =
postOffice.bindQueue("condition1", queue1);
@@ -554,17 +554,17 @@
SimpleFilter filter = new SimpleFilter(2);
- PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter);
+ PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter);
Binding binding1 =
postOffice.bindQueue("topic1", queue1);
- PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding2 =
postOffice.bindQueue("topic1", queue2);
- PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding3 =
postOffice.bindQueue("topic1", queue3);
@@ -654,32 +654,32 @@
{
postOffice = createPostOffice();
- PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding1 =
postOffice.bindQueue("topic1", queue1);
- PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding2 =
postOffice.bindQueue("topic1", queue2);
- PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding3 =
postOffice.bindQueue("topic1", queue3);
- PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
Binding binding4 =
postOffice.bindQueue("topic2", queue4);
- PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
Binding binding5 =
postOffice.bindQueue("topic2", queue5);
- PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
Binding binding6 =
postOffice.bindQueue("topic2", queue6);
@@ -826,12 +826,12 @@
{
postOffice = createPostOffice();
- PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding1 =
postOffice.bindQueue("topic1", queue1);
- PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
Binding binding2 =
postOffice.bindQueue("topic1", queue2);
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -75,8 +75,7 @@
public void setUp() throws Exception
{
- super.setUp();
-
+ super.setUp();
}
public void tearDown() throws Exception
@@ -100,11 +99,11 @@
//Add a couple of bindings
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 =
office1.bindClusteredQueue("topic1", queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office1, 1, "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office1, 1, "sub2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 =
office1.bindClusteredQueue("topic1", queue2);
@@ -123,7 +122,7 @@
//Add another binding on node 2
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, 2, "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, 2, "sub3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 =
office2.bindClusteredQueue("topic1", queue3);
@@ -150,7 +149,7 @@
//Add another binding on node 1
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, 2, "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, 2, "sub4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding4 =
office2.bindClusteredQueue("topic1", queue4);
@@ -214,7 +213,7 @@
//Add another binding on node 3
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office3, 3, "sub5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office3, 3, "sub5", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 =
office3.bindClusteredQueue("topic1", queue5);
@@ -250,12 +249,12 @@
//Add a durable and a non durable binding on node 1
- LocalClusteredQueue queue6 = new LocalClusteredQueue(office1, 1, "sub6", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue6 = new LocalClusteredQueue(office1, 1, "sub6", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding6 =
office1.bindClusteredQueue("topic1", queue6);
- LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, 1, "sub7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, 1, "sub7", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding7 =
office1.bindClusteredQueue("topic1", queue7);
@@ -423,7 +422,7 @@
if (office3 != null)
{
- office2.stop();
+ office3.stop();
}
checkNoBindingData();
@@ -451,26 +450,46 @@
clusteredTransactionalRoute(false);
}
- public void testClusteredNonPersistentRouteWithFilter() throws Throwable
+ public void testClusteredNonPersistentRouteWithFilterNonRecoverable() throws Throwable
{
- this.clusteredRouteWithFilter(false);
+ this.clusteredRouteWithFilter(false, false);
}
- public void testClusteredPersistentRouteWithFilter() throws Throwable
+ public void testClusteredPersistentRouteWithFilterNonRecoverable() throws Throwable
{
- this.clusteredRouteWithFilter(true);
+ this.clusteredRouteWithFilter(true, false);
}
- public void testRouteSharedPointToPointQueuePersistent() throws Throwable
+ public void testClusteredNonPersistentRouteWithFilterRecoverable() throws Throwable
{
- this.routeSharedQueue(true);
+ this.clusteredRouteWithFilter(false, true);
}
- public void testRouteSharedPointToPointQueueNonPersistent() throws Throwable
+ public void testClusteredPersistentRouteWithFilterRecoverable() throws Throwable
{
- this.routeSharedQueue(false);
+ this.clusteredRouteWithFilter(true, true);
}
+
+ public void testRouteSharedPointToPointQueuePersistentNonRecoverable() throws Throwable
+ {
+ this.routeSharedQueue(true, false);
+ }
+ public void testRouteSharedPointToPointQueueNonPersistentNonRecoverable() throws Throwable
+ {
+ this.routeSharedQueue(false, false);
+ }
+
+ public void testRouteSharedPointToPointQueuePersistentRecoverable() throws Throwable
+ {
+ this.routeSharedQueue(true, true);
+ }
+
+ public void testRouteSharedPointToPointQueueNonPersistentRecoverable() throws Throwable
+ {
+ this.routeSharedQueue(false, true);
+ }
+
public void testRouteComplexTopicPersistent() throws Throwable
{
this.routeComplexTopic(true);
@@ -480,18 +499,28 @@
{
this.routeComplexTopic(false);
}
+
+ public void testRouteLocalQueuesPersistentNonRecoverable() throws Throwable
+ {
+ this.routeLocalQueues(true, false);
+ }
- public void testRouteLocalQueuesPersistent() throws Throwable
+ public void testRouteLocalQueuesNonPersistentNonRecoverable() throws Throwable
{
- this.routeLocalQueues(true);
+ this.routeLocalQueues(false, false);
}
- public void testRouteLocalQueuesNonPersistent() throws Throwable
+ public void testRouteLocalQueuesPersistentRecoverable() throws Throwable
{
- this.routeLocalQueues(false);
+ this.routeLocalQueues(true, true);
}
+ public void testRouteLocalQueuesNonPersistentRecoverable() throws Throwable
+ {
+ this.routeLocalQueues(false, true);
+ }
+
/*
* We should allow the clustered bind of queues with the same queue name on different nodes of the
* cluster
@@ -508,15 +537,15 @@
office2 = createClusteredPostOffice(2, "testgroup");
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
try
{
@@ -527,7 +556,7 @@
{
//Ok
}
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
try
{
@@ -543,11 +572,11 @@
office2.unbindClusteredQueue("queue1");
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office1.bindClusteredQueue("queue1", queue5);
- PagingFilteredQueue queue6 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ PagingFilteredQueue queue6 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
try
{
Binding binding6 = office1.bindQueue("queue1", queue6);
@@ -561,13 +590,13 @@
office1.unbindClusteredQueue("queue1");
//It should be possible to bind queues locally into a clustered post office
- LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding7 = office1.bindQueue("queue1", queue7);
- LocalClusteredQueue queue8 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue8 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding8 = office2.bindQueue("queue1", queue8);
- LocalClusteredQueue queue9 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue9 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
try
{
Binding binding9 = office1.bindQueue("queue1", queue9);
@@ -599,7 +628,7 @@
// Protected -----------------------------------------------------
- protected void clusteredRouteWithFilter(boolean persistentMessage) throws Throwable
+ protected void clusteredRouteWithFilter(boolean persistentMessage, boolean recoverable) throws Throwable
{
ClusteredPostOffice office1 = null;
@@ -613,15 +642,15 @@
SimpleFilter filter1 = new SimpleFilter(2);
SimpleFilter filter2 = new SimpleFilter(3);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter1, tr);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), filter1, tr);
Binding binding1 =
office1.bindClusteredQueue("topic1", queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter2, tr);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), filter2, tr);
Binding binding2 =
office2.bindClusteredQueue("topic1", queue2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, 2, "queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, 2, "queue3", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 =
office2.bindClusteredQueue("topic1", queue3);
@@ -721,52 +750,52 @@
LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
Binding[] bindings = new Binding[16];
- queues[0] = new LocalClusteredQueue(office1, 1, "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[0] = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[0] = office1.bindClusteredQueue("topic1", queues[0]);
- queues[1] = new LocalClusteredQueue(office1, 1, "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[1] = new LocalClusteredQueue(office1, 1, "sub2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[1] = office1.bindClusteredQueue("topic1", queues[1]);
- queues[2] = new LocalClusteredQueue(office2, 2, "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[2] = new LocalClusteredQueue(office2, 2, "sub3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[2] = office2.bindClusteredQueue("topic1", queues[2]);
- queues[3] = new LocalClusteredQueue(office2, 2, "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[3] = new LocalClusteredQueue(office2, 2, "sub4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[3] = office2.bindClusteredQueue("topic1", queues[3]);
- queues[4] = new LocalClusteredQueue(office2, 2, "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[4] = new LocalClusteredQueue(office2, 2, "sub5", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[4] = office2.bindClusteredQueue("topic1", queues[4]);
- queues[5] = new LocalClusteredQueue(office1, 1, "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[5] = new LocalClusteredQueue(office1, 1, "sub6", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[5] = office1.bindClusteredQueue("topic1", queues[5]);
- queues[6] = new LocalClusteredQueue(office1, 1, "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[6] = new LocalClusteredQueue(office1, 1, "sub7", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[6] = office1.bindClusteredQueue("topic1", queues[6]);
- queues[7] = new LocalClusteredQueue(office1, 1, "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[7] = new LocalClusteredQueue(office1, 1, "sub8", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[7] = office1.bindClusteredQueue("topic1", queues[7]);
- queues[8] = new LocalClusteredQueue(office1, 1, "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[8] = new LocalClusteredQueue(office1, 1, "sub9", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[8] = office1.bindClusteredQueue("topic2", queues[8]);
- queues[9] = new LocalClusteredQueue(office1, 1, "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[9] = new LocalClusteredQueue(office1, 1, "sub10", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[9] = office1.bindClusteredQueue("topic2", queues[9]);
- queues[10] = new LocalClusteredQueue(office2, 2, "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[10] = new LocalClusteredQueue(office2, 2, "sub11", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[10] = office2.bindClusteredQueue("topic2", queues[10]);
- queues[11] = new LocalClusteredQueue(office2, 2, "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[11] = new LocalClusteredQueue(office2, 2, "sub12", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[11] = office2.bindClusteredQueue("topic2", queues[11]);
- queues[12] = new LocalClusteredQueue(office2, 2, "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[12] = new LocalClusteredQueue(office2, 2, "sub13", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[12] = office2.bindClusteredQueue("topic2", queues[12]);
- queues[13] = new LocalClusteredQueue(office1, 1, "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[13] = new LocalClusteredQueue(office1, 1, "sub14", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[13] = office1.bindClusteredQueue("topic2", queues[13]);
- queues[14] = new LocalClusteredQueue(office1, 1, "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[14] = new LocalClusteredQueue(office1, 1, "sub15", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[14] = office1.bindClusteredQueue("topic2", queues[14]);
- queues[15] = new LocalClusteredQueue(office1, 1, "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[15] = new LocalClusteredQueue(office1, 1, "sub16", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[15] = office1.bindClusteredQueue("topic2", queues[15]);
SimpleReceiver[] receivers = new SimpleReceiver[16];
@@ -882,7 +911,7 @@
}
}
- protected void routeSharedQueue(boolean persistentMessage) throws Throwable
+ protected void routeSharedQueue(boolean persistentMessage, boolean recoverable) throws Throwable
{
ClusteredPostOffice office1 = null;
@@ -908,27 +937,27 @@
//We deploy the queue on nodes 1, 2, 3, 4 and 5
//We don't deploy on node 6
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue3.add(receiver3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue4.add(receiver4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue5.add(receiver5);
@@ -1044,7 +1073,7 @@
/*
* Clustered post offices should be able to have local queues bound to them too.
*/
- protected void routeLocalQueues(boolean persistentMessage) throws Throwable
+ protected void routeLocalQueues(boolean persistentMessage, boolean recoverable) throws Throwable
{
ClusteredPostOffice office1 = null;
@@ -1058,17 +1087,17 @@
office2 = createClusteredPostOffice(2, "testgroup");
office3 = createClusteredPostOffice(3, "testgroup");
- LocalClusteredQueue sub1 = new LocalClusteredQueue(office1, 1, "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue sub1 = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office1.bindQueue("topic", sub1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sub1.add(receiver1);
- LocalClusteredQueue sub2 = new LocalClusteredQueue(office2, 2, "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue sub2 = new LocalClusteredQueue(office2, 2, "sub2", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office2.bindQueue("topic", sub2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sub2.add(receiver2);
- LocalClusteredQueue sub3 = new LocalClusteredQueue(office3, 3, "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue sub3 = new LocalClusteredQueue(office3, 3, "sub3", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 = office3.bindQueue("topic", sub3);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sub3.add(receiver3);
@@ -1172,13 +1201,13 @@
//======
//Non durable 1 on node 2
- LocalClusteredQueue nonDurable1 = new LocalClusteredQueue(office2, 2, "nondurable1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue nonDurable1 = new LocalClusteredQueue(office2, 2, "nondurable1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office2.bindClusteredQueue("topic", nonDurable1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable1.add(receiver1);
//Non durable 2 on node 2
- LocalClusteredQueue nonDurable2 = new LocalClusteredQueue(office2, 2, "nondurable2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue nonDurable2 = new LocalClusteredQueue(office2, 2, "nondurable2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office2.bindClusteredQueue("topic", nonDurable2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable2.add(receiver2);
@@ -1187,13 +1216,13 @@
//======
//Non shared durable
- LocalClusteredQueue nonSharedDurable1 = new LocalClusteredQueue(office3, 3, "nonshareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue nonSharedDurable1 = new LocalClusteredQueue(office3, 3, "nonshareddurable1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 = office3.bindClusteredQueue("topic", nonSharedDurable1);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonSharedDurable1.add(receiver3);
//Non durable
- LocalClusteredQueue nonDurable3 = new LocalClusteredQueue(office3, 3, "nondurable3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue nonDurable3 = new LocalClusteredQueue(office3, 3, "nondurable3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding4 = office3.bindClusteredQueue("topic", nonDurable3);
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable3.add(receiver4);
@@ -1202,31 +1231,31 @@
//======
//Shared durable
- LocalClusteredQueue sharedDurable1 = new LocalClusteredQueue(office4, 4, "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue sharedDurable1 = new LocalClusteredQueue(office4, 4, "shareddurable1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office4.bindClusteredQueue("topic", sharedDurable1);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable1.add(receiver5);
//Non shared durable
- LocalClusteredQueue nonSharedDurable2 = new LocalClusteredQueue(office4, 4, "nonshareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue nonSharedDurable2 = new LocalClusteredQueue(office4, 4, "nonshareddurable2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding6 = office4.bindClusteredQueue("topic", nonSharedDurable2);
SimpleReceiver receiver6 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonSharedDurable2.add(receiver6);
//Non durable
- LocalClusteredQueue nonDurable4 = new LocalClusteredQueue(office4, 4, "nondurable4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue nonDurable4 = new LocalClusteredQueue(office4, 4, "nondurable4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding7 = office4.bindClusteredQueue("topic", nonDurable4);
SimpleReceiver receiver7 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable4.add(receiver7);
// Non durable
- LocalClusteredQueue nonDurable5 = new LocalClusteredQueue(office4, 4, "nondurable5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue nonDurable5 = new LocalClusteredQueue(office4, 4, "nondurable5", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding8 = office4.bindClusteredQueue("topic", nonDurable5);
SimpleReceiver receiver8 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable5.add(receiver8);
//Non durable
- LocalClusteredQueue nonDurable6 = new LocalClusteredQueue(office4, 4, "nondurable6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue nonDurable6 = new LocalClusteredQueue(office4, 4, "nondurable6", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding9 = office4.bindClusteredQueue("topic", nonDurable6);
SimpleReceiver receiver9 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable6.add(receiver9);
@@ -1234,32 +1263,32 @@
// Node 5
//=======
//Shared durable
- LocalClusteredQueue sharedDurable2 = new LocalClusteredQueue(office5, 5, "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue sharedDurable2 = new LocalClusteredQueue(office5, 5, "shareddurable1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding10 = office5.bindClusteredQueue("topic", sharedDurable2);
SimpleReceiver receiver10 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable2.add(receiver10);
//Shared durable
- LocalClusteredQueue sharedDurable3 = new LocalClusteredQueue(office5, 5, "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue sharedDurable3 = new LocalClusteredQueue(office5, 5, "shareddurable2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding11 = office5.bindClusteredQueue("topic", sharedDurable3);
SimpleReceiver receiver11 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable3.add(receiver11);
// Node 6
//=========
- LocalClusteredQueue sharedDurable4 = new LocalClusteredQueue(office6, 6, "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue sharedDurable4 = new LocalClusteredQueue(office6, 6, "shareddurable2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding12 = office6.bindClusteredQueue("topic", sharedDurable4);
SimpleReceiver receiver12 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable4.add(receiver12);
- LocalClusteredQueue nonDurable7 = new LocalClusteredQueue(office6, 6, "nondurable7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue nonDurable7 = new LocalClusteredQueue(office6, 6, "nondurable7", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding13 = office6.bindClusteredQueue("topic", nonDurable7);
SimpleReceiver receiver13 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable7.add(receiver13);
//Node 7
//=======
- LocalClusteredQueue sharedDurable5 = new LocalClusteredQueue(office7, 7, "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue sharedDurable5 = new LocalClusteredQueue(office7, 7, "shareddurable2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding14 = office7.bindClusteredQueue("topic", sharedDurable5);
SimpleReceiver receiver14 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable5.add(receiver14);
@@ -1588,52 +1617,52 @@
LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
Binding[] bindings = new Binding[16];
- queues[0] = new LocalClusteredQueue(office1, 1, "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[0] = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[0] = office1.bindClusteredQueue("topic1", queues[0]);
- queues[1] = new LocalClusteredQueue(office1, 1, "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[1] = new LocalClusteredQueue(office1, 1, "sub2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[1] = office1.bindClusteredQueue("topic1", queues[1]);
- queues[2] = new LocalClusteredQueue(office2, 2, "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[2] = new LocalClusteredQueue(office2, 2, "sub3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[2] = office2.bindClusteredQueue("topic1", queues[2]);
- queues[3] = new LocalClusteredQueue(office2, 2, "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[3] = new LocalClusteredQueue(office2, 2, "sub4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[3] = office2.bindClusteredQueue("topic1", queues[3]);
- queues[4] = new LocalClusteredQueue(office2, 2, "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[4] = new LocalClusteredQueue(office2, 2, "sub5", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[4] = office2.bindClusteredQueue("topic1", queues[4]);
- queues[5] = new LocalClusteredQueue(office1, 1, "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[5] = new LocalClusteredQueue(office1, 1, "sub6", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[5] = office1.bindClusteredQueue("topic1", queues[5]);
- queues[6] = new LocalClusteredQueue(office1, 1, "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[6] = new LocalClusteredQueue(office1, 1, "sub7", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[6] = office1.bindClusteredQueue("topic1", queues[6]);
- queues[7] = new LocalClusteredQueue(office1, 1, "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[7] = new LocalClusteredQueue(office1, 1, "sub8", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[7] = office1.bindClusteredQueue("topic1", queues[7]);
- queues[8] = new LocalClusteredQueue(office1, 1, "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[8] = new LocalClusteredQueue(office1, 1, "sub9", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[8] = office1.bindClusteredQueue("topic2", queues[8]);
- queues[9] = new LocalClusteredQueue(office1, 1, "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[9] = new LocalClusteredQueue(office1, 1, "sub10", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[9] = office1.bindClusteredQueue("topic2", queues[9]);
- queues[10] = new LocalClusteredQueue(office2, 2, "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[10] = new LocalClusteredQueue(office2, 2, "sub11", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[10] = office2.bindClusteredQueue("topic2", queues[10]);
- queues[11] = new LocalClusteredQueue(office2, 2, "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[11] = new LocalClusteredQueue(office2, 2, "sub12", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[11] = office2.bindClusteredQueue("topic2", queues[11]);
- queues[12] = new LocalClusteredQueue(office2, 2, "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[12] = new LocalClusteredQueue(office2, 2, "sub13", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[12] = office2.bindClusteredQueue("topic2", queues[12]);
- queues[13] = new LocalClusteredQueue(office1, 1, "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ queues[13] = new LocalClusteredQueue(office1, 1, "sub14", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[13] = office1.bindClusteredQueue("topic2", queues[13]);
- queues[14] = new LocalClusteredQueue(office1, 1, "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[14] = new LocalClusteredQueue(office1, 1, "sub15", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[14] = office1.bindClusteredQueue("topic2", queues[14]);
- queues[15] = new LocalClusteredQueue(office1, 1, "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ queues[15] = new LocalClusteredQueue(office1, 1, "sub16", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[15] = office1.bindClusteredQueue("topic2", queues[15]);
SimpleReceiver[] receivers = new SimpleReceiver[16];
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -123,27 +123,27 @@
office6 = createClusteredPostOffice(6, "testgroup");
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office2.bindClusteredQueue("topic", queue1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office3.bindClusteredQueue("topic", queue2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 = office4.bindClusteredQueue("topic", queue3);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue3.add(receiver3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding4 = office5.bindClusteredQueue("topic", queue4);
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue4.add(receiver4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office6.bindClusteredQueue("topic", queue5);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue5.add(receiver5);
@@ -262,27 +262,27 @@
office6 = createClusteredPostOffice(6, "testgroup");
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office2.bindClusteredQueue("topic", queue1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office3.bindClusteredQueue("topic", queue2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 = office4.bindClusteredQueue("topic", queue3);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue3.add(receiver3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding4 = office5.bindClusteredQueue("topic", queue4);
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue4.add(receiver4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office6.bindClusteredQueue("topic", queue5);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue5.add(receiver5);
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-10-05 16:10:31 UTC (rev 1434)
@@ -69,18 +69,28 @@
super.tearDown();
}
- public void testRedistNonPersistent() throws Throwable
+ public void testRedistNonPersistentNonRecoverable() throws Throwable
{
- redistTest(false);
+ redistTest(false, false);
}
- public void testRedistPersistent() throws Throwable
+ public void testRedistPersistentNonRecoverable() throws Throwable
{
- redistTest(true);
+ redistTest(true, false);
}
- public void redistTest(boolean persistent) throws Throwable
+ public void testRedistNonPersistentRecoverable() throws Throwable
{
+ redistTest(false, true);
+ }
+
+ public void testRedistPersistentRecoverable() throws Throwable
+ {
+ redistTest(true, true);
+ }
+
+ public void redistTest(boolean persistent, boolean recoverable) throws Throwable
+ {
ClusteredPostOffice office1 = null;
ClusteredPostOffice office2 = null;
@@ -105,19 +115,19 @@
log.info("Started offices");
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
log.info("bound queues");
More information about the jboss-cvs-commits
mailing list