[Jboss-cvs] JBoss Messaging SVN: r1360 - in trunk: src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/base tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Sep 23 10:21:25 EDT 2006
Author: timfox
Date: 2006-09-23 10:21:10 -0400 (Sat, 23 Sep 2006)
New Revision: 1360
Modified:
trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
trunk/src/main/org/jboss/jms/server/destination/QueueService.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/Delivery.java
trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Log:
More clustering
Modified: trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -17,6 +17,7 @@
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.system.ServiceMBeanSupport;
import org.w3c.dom.Element;
@@ -60,6 +61,8 @@
protected MessageStore ms;
+ protected TransactionRepository tr;
+
protected IdManager idm;
protected String nodeId;
@@ -97,6 +100,8 @@
ms = serverPeer.getMessageStore();
+ tr = serverPeer.getTxRepository();
+
idm = serverPeer.getChannelIdManager();
nodeId = serverPeer.getServerPeerID();
Modified: trunk/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -117,7 +117,7 @@
else
{
queue = new LocalClusteredQueue(postOffice, nodeId, destination.getName(), idm.getId(), ms, pm, true, true,
- executor, null,
+ executor, null, tr,
destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize());
((ClusteredPostOffice)postOffice).bindClusteredQueue(destination.getName(), (LocalClusteredQueue)queue);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -63,6 +63,7 @@
import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.util.id.GUID;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -102,6 +103,7 @@
private DestinationManager dm;
private IdManager idm;
private QueuedExecutorPool pool;
+ private TransactionRepository tr;
private PostOffice topicPostOffice;
private PostOffice queuePostOffice;
private String nodeId;
@@ -126,6 +128,7 @@
idm = sp.getChannelIdManager();
pool = sp.getQueuedExecutorPool();
nodeId = sp.getServerPeerID();
+ tr = sp.getTxRepository();
consumers = new HashMap();
browsers = new HashMap();
@@ -244,7 +247,7 @@
else
{
q = new LocalClusteredQueue(topicPostOffice, nodeId, name, idm.getId(), ms, pm, true, true,
- executor, selector,
+ executor, selector, tr,
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize());
@@ -302,7 +305,7 @@
else
{
q = new LocalClusteredQueue(topicPostOffice, nodeId, name, idm.getId(), ms, pm, true, true,
- executor, selector,
+ executor, selector, tr,
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize());
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -172,7 +172,7 @@
}
else
{
- return handleInternal(sender, ref, tx, true);
+ return handleInternal(sender, ref, tx, true, false);
}
}
@@ -182,26 +182,10 @@
{
if (trace) { log.trace("acknowledging " + d + (tx == null ? " non-transactionally" : " transactionally in " + tx)); }
- if (tx == null)
- {
- // acknowledge non transactionally
-
- // TODO We should consider also executing acks on the event queue
- acknowledgeInternal(d, true);
- }
- else
- {
- this.getCallback(tx).addDelivery(d);
-
- if (trace) { log.trace(this + " added " + d + " to memory on transaction " + tx); }
-
- if (recoverable && d.getReference().isReliable())
- {
- pm.removeReference(channelID, d.getReference(), tx);
- }
- }
+ this.acknowledgeInternal(d, tx, true, false);
}
-
+
+
public void cancel(Delivery d) throws Throwable
{
// TODO We should also consider executing cancels on the event queue
@@ -325,6 +309,9 @@
{
future = new Future();
}
+ //TODO we should keep track of how many deliveries are currently in the queue
+ //so we don't execute another delivery when one is in the queue, since
+ //this is pointless
this.executor.execute(new DeliveryRunnable(future));
@@ -583,7 +570,7 @@
Delivery delivery = new SimpleDelivery(this, ref, true);
- acknowledgeInternal(delivery, true);
+ acknowledgeInternal(delivery, null, true, false);
}
else
{
@@ -694,7 +681,7 @@
}
protected Delivery handleInternal(DeliveryObserver sender, MessageReference ref,
- Transaction tx, boolean persist)
+ Transaction tx, boolean persist, boolean synchronous)
{
if (ref == null)
{
@@ -771,7 +758,8 @@
else
{
// add to post commit callback
- this.getCallback(tx).addRef(ref);
+ getCallback(tx, synchronous).addRef(ref);
+
if (trace) { log.trace(this + " added transactionally " + ref + " in memory"); }
}
@@ -798,19 +786,33 @@
return new SimpleDelivery(this, ref, true);
}
- protected void acknowledgeInternal(Delivery d, boolean persist) throws Exception
- {
- synchronized (deliveryLock)
+ protected void acknowledgeInternal(Delivery d, Transaction tx, boolean persist, boolean synchronous) throws Exception
+ {
+ if (tx == null)
{
- acknowledgeInMemory(d);
+ synchronized (deliveryLock)
+ {
+ acknowledgeInMemory(d);
+ }
+
+ if (persist && recoverable && d.getReference().isReliable())
+ {
+ pm.removeReference(channelID, d.getReference(), null);
+ }
+
+ d.getReference().releaseMemoryReference();
}
-
- if (persist && recoverable && d.getReference().isReliable())
+ else
{
- pm.removeReference(channelID, d.getReference(), null);
+ this.getCallback(tx, synchronous).addDelivery(d);
+
+ if (trace) { log.trace(this + " added " + d + " to memory on transaction " + tx); }
+
+ if (recoverable && d.getReference().isReliable())
+ {
+ pm.removeReference(channelID, d.getReference(), tx);
+ }
}
-
- d.getReference().releaseMemoryReference();
}
protected boolean acknowledgeInMemory(Delivery d)
@@ -830,16 +832,24 @@
return removed;
}
- protected InMemoryCallback getCallback(Transaction tx)
+ protected InMemoryCallback getCallback(Transaction tx, boolean synchronous)
{
- InMemoryCallback callback = (InMemoryCallback) tx.getCallback(this);
+ InMemoryCallback callback = (InMemoryCallback) tx.getCallback(this);
if (callback == null)
{
- callback = new InMemoryCallback();
+ callback = new InMemoryCallback(synchronous);
tx.addCallback(callback, this);
}
+ else
+ {
+ //Sanity
+ if (callback.isSynchronous() != synchronous)
+ {
+ throw new IllegalStateException("Callback synchronousness status doesn't match");
+ }
+ }
return callback;
}
@@ -859,13 +869,26 @@
private List refsToAdd;
private List deliveriesToRemove;
+
+ private boolean synchronous;
+
+ private boolean committing;
- private InMemoryCallback()
+ private Future result;
+
+ private InMemoryCallback(boolean synchronous)
{
refsToAdd = new ArrayList();
deliveriesToRemove = new ArrayList();
+
+ this.synchronous = synchronous;
}
+
+ private boolean isSynchronous()
+ {
+ return synchronous;
+ }
private void addRef(MessageReference ref)
{
@@ -896,11 +919,7 @@
{
// NOOP
}
-
- private boolean committing;
-
- private Future result;
-
+
public void run()
{
try
@@ -913,13 +932,7 @@
{
doAfterRollback();
}
-
- // prompt delivery
- if (receiversReady)
- {
- deliverInternal(true);
- }
-
+
result.setResult(null);
}
catch (Throwable t)
@@ -928,24 +941,46 @@
}
}
- public void afterCommit(boolean onePhase) throws TransactionException
+ public void afterCommit(boolean onePhase) throws Exception
{
- // We don't execute the commit directly, we add it to the event queue
- // of the channel
- // so it is executed in turn
- committing = true;
-
- executeAndWaitForResult();
+ if (synchronous)
+ {
+ try
+ {
+ doAfterCommit();
+ }
+ catch (Throwable t)
+ {
+ //TODO Sort out exception handling!!
+ throw new TransactionException("Failed to commit", t);
+ }
+ }
+ else
+ {
+ // We don't execute the commit directly, we add it to the event queue
+ // of the channel
+ // so it is executed in turn
+ committing = true;
+
+ executeAndWaitForResult();
+ }
}
- public void afterRollback(boolean onePhase) throws TransactionException
+ public void afterRollback(boolean onePhase) throws Exception
{
- // We don't execute the commit directly, we add it to the event queue
- // of the channel
- // so it is executed in turn
- committing = false;
-
- executeAndWaitForResult();
+ if (synchronous)
+ {
+ doAfterRollback();
+ }
+ else
+ {
+ // We don't execute the commit directly, we add it to the event queue
+ // of the channel
+ // so it is executed in turn
+ committing = false;
+
+ executeAndWaitForResult();
+ }
}
public String toString()
@@ -989,7 +1024,7 @@
}
}
- private void doAfterCommit() throws TransactionException
+ private void doAfterCommit() throws Throwable
{
// We add the references to the state
@@ -1035,6 +1070,12 @@
throw new TransactionException("Failed to ack message", t);
}
}
+
+ //prompt delivery
+ if (receiversReady)
+ {
+ deliverInternal(true);
+ }
}
private void doAfterRollback()
@@ -1086,21 +1127,27 @@
public void run()
{
- try
+ try
{
- receiversReady = true;
-
- deliverInternal(false);
-
- if (result != null)
- {
- result.setResult(null);
+ if (router.numberOfReceivers() > 0)
+ {
+ receiversReady = true;
+
+ deliverInternal(false);
+
+ if (result != null)
+ {
+ result.setResult(null);
+ }
}
}
catch (Throwable t)
{
log.error("Failed to deliver", t);
- result.setException(t);
+ if (result != null)
+ {
+ result.setException(t);
+ }
}
}
}
@@ -1125,7 +1172,7 @@
public void run()
{
- Delivery d = handleInternal(sender, ref, null, persist);
+ Delivery d = handleInternal(sender, ref, null, persist, false);
result.setResult(d);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/Delivery.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Delivery.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/Delivery.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -52,5 +52,5 @@
void acknowledge(Transaction tx) throws Throwable;
void cancel() throws Throwable;
-
+
}
Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -247,10 +247,6 @@
firstPagingOrder = nextPagingOrder = 0;
}
- log.info("Loading channel");
-
- log.info("Got " + ili.getRefInfos().size() + " intial refs");
-
Map refMap = processReferences(ili.getRefInfos());
Iterator iter = ili.getRefInfos().iterator();
@@ -436,7 +432,6 @@
if (messageRefs.size() <= fullSize - numberLoadable)
{
//This will flush the down cache too
- log.info("Loading " + numberLoadable + " refs");
loadPagedReferences(numberLoadable);
return true;
Modified: trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -48,7 +48,7 @@
protected boolean selectorAccepted;
protected DeliveryObserver observer;
protected MessageReference reference;
-
+
private boolean trace = log.isTraceEnabled();
// Constructors --------------------------------------------------
@@ -89,8 +89,8 @@
this.observer = observer;
this.selectorAccepted = selectorAccepted;
}
+
-
// Delivery implementation ---------------------------------
public MessageReference getReference()
@@ -148,7 +148,7 @@
observer.cancel(this);
cancelled = true;
}
-
+
// Public --------------------------------------------------------
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -293,8 +293,6 @@
ps = conn.prepareStatement(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"));
- log.info(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"));
-
ps.setLong(1, orderStart);
ps.setLong(2, orderEnd);
@@ -1181,8 +1179,6 @@
Long minOrdering = new Long(rs.getLong(1));
- log.info("min ordering is: " + minOrdering);
-
if (rs.wasNull())
{
minOrdering = null;
@@ -1195,8 +1191,6 @@
maxOrdering = null;
}
- log.info("Min ordering: " + minOrdering + " max Ordering: " + maxOrdering);
-
ps = conn.prepareStatement(getSQLStatement("LOAD_UNPAGED_REFS"));
ps.setLong(1, channelID);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -43,7 +43,6 @@
import org.jboss.messaging.core.FilterFactory;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.Queue;
-import org.jboss.messaging.core.SimpleDelivery;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
@@ -406,7 +405,7 @@
ClusteredQueue queue = (ClusteredQueue)del.getObserver();
- log.info("Routing message to queue:" + queue.getName() + " on node " + queue.getNodeId());
+ // log.info("Routing message to queue:" + queue.getName() + " on node " + queue.getNodeId());
if (router.numberOfReceivers() > 1)
{
@@ -446,7 +445,7 @@
{
if (numberRemote == 1)
{
- log.info("unicast no tx");
+ // log.info("unicast no tx");
//Unicast - only one node is interested in the message
//FIXME - temporarily commented out until can get unicast to work
@@ -455,7 +454,7 @@
}
else
{
- log.info("multicast no tx");
+ // log.info("multicast no tx");
//Multicast - more than one node is interested
asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
}
@@ -587,56 +586,11 @@
}
}
- public void addToQueue(String queueName, List messages) throws Exception
- {
- lock.readLock().acquire();
-
- try
- {
- Binding binding = this.getBindingForQueueName(queueName);
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find binding for queue name " + queueName);
- }
-
- LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
-
- Iterator iter = messages.iterator();
-
- while (iter.hasNext())
- {
- MessageReference ref = null;
-
- try
- {
- org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
-
- ref = ms.reference(msg);
-
- queue.handleFromCluster(ref);
- }
- finally
- {
- if (ref != null)
- {
- ref.releaseMemoryReference();
- }
- }
- }
- }
- finally
- {
-
- lock.readLock().release();
- }
- }
-
public void routeFromCluster(org.jboss.messaging.core.Message message, String routingKey,
Map queueNameNodeIdMap) throws Exception
{
- log.info(this.nodeId + " received route from cluster, ref = " + message.getMessageID() + " routing key " +
- routingKey + " map " + queueNameNodeIdMap);
+ // log.info(this.nodeId + " received route from cluster, ref = " + message.getMessageID() + " routing key " +
+ // routingKey + " map " + queueNameNodeIdMap);
lock.readLock().acquire();
@@ -649,12 +603,8 @@
// We route on the condition
DefaultClusteredBindings cb = (DefaultClusteredBindings)conditionMap.get(routingKey);
- // log.info("cb is " + cb);
-
if (cb != null)
{
- // log.info("cb size is " + cb.getAllBindings().size());
-
Collection bindings = cb.getAllBindings();
Iterator iter = bindings.iterator();
@@ -667,7 +617,6 @@
{
boolean handle = true;
- //log.info("Queue map is: " + queueNameNodeIdMap);
if (queueNameNodeIdMap != null)
{
String desiredNodeId = (String)queueNameNodeIdMap.get(binding.getQueue().getName());
@@ -687,22 +636,9 @@
LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
- log.info(queue.getName() + " is handling it on node " + queue.getNodeId());
-
Delivery del = queue.handleFromCluster(ref);
-
- //log.info("Handled it: " + del);
- //log.info("accepted: " +del.isSelectorAccepted());
}
- else
- {
- log.info(this.nodeId + " not handling it");
- }
}
- else
- {
- //log.info("wrong node");
- }
}
}
}
@@ -816,9 +752,9 @@
}
}
- public void sendStats() throws Exception
+ public void sendQueueStats() throws Exception
{
- lock.writeLock().acquire();
+ lock.readLock().acquire();
List statsList = null;
@@ -839,7 +775,9 @@
if (q.isActive())
{
QueueStats stats = q.getStats();
-
+
+ //log.info(q.getNodeId() + " queue " + stats.getQueueName() + " count " + stats.getMessageCount());
+
//We don't bother sending the stats if there's no significant change in the values
if (q.changedSignificantly())
@@ -851,13 +789,17 @@
statsList.add(stats);
}
+ else
+ {
+ //log.info("Not changed significantly");
+ }
}
}
}
}
finally
{
- lock.writeLock().release();
+ lock.readLock().release();
}
if (statsList != null)
@@ -870,7 +812,7 @@
public void updateQueueStats(String nodeId, List statsList) throws Exception
{
- lock.writeLock().acquire();
+ lock.readLock().acquire();
try
{
@@ -916,7 +858,7 @@
if (toQueue != null)
{
- localQueue.setPullQueue(toQueue);
+ localQueue.setPullInfo(toQueue, pullSize);
//We now trigger delivery - this may cause a pull event
localQueue.deliver(false);
@@ -927,7 +869,7 @@
}
finally
{
- lock.writeLock().release();
+ lock.readLock().release();
}
}
@@ -951,12 +893,35 @@
return dels;
}
-
- public void pullMessages(ClusteredQueue localQueue, ClusteredQueue remoteQueue) throws Throwable
+
+ public Address getAddressForNodeId(String nodeId) throws Exception
{
- pullMessages(localQueue, remoteQueue, pullSize);
+ lock.readLock().acquire();
+
+ try
+ {
+ return (Address)nodeIdAddressMap.get(nodeId);
+ }
+ finally
+ {
+ lock.readLock().release();
+ }
}
-
+
+ /*
+ * Unicast a sync request
+ */
+ public Object syncSendRequest(ClusterRequest request, Address address) throws Exception
+ {
+ byte[] bytes = writeRequest(request);
+
+ Message message = new Message(address, null, bytes);
+
+ Object result = controlMessageDispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
+
+ return result;
+ }
+
// Public ------------------------------------------------------------------------------------------
// Protected ---------------------------------------------------------------------------------------
@@ -1070,7 +1035,7 @@
QueuedExecutor executor = (QueuedExecutor)pool.get();
queue = new LocalClusteredQueue(this, nodeId, queueName, channelId, ms, pm, true,
- durable, executor, filter);
+ durable, executor, filter, tr);
}
else
{
@@ -1086,105 +1051,7 @@
// Private ------------------------------------------------------------------------------------------
- /**
- * TODO This can probably be moved into LocalClusteredQueue
- *
- * Pull messages from a remote queue to a local queue.
- * If any of the messages are reliable then this needs to be done reliable (i.e. without loss or redelivery)
- * Normally this would require 2PC which would make performance suck.
- * However since we know both queues share the same DB then we can do the persistence locally in the same
- * tx thus avoiding 2PC and maintaining reliability:)
- * We do the following:
- *
- * 1. A tx is started locally
- * 2. Create deliveries for message(s) on the remote node - bring messages back to the local node
- * We send a message to the remote node to retrieve a set of deliveries from the queue - it gets a max of num
- * deliveries.
- * The unreliable ones can be acknowledged immediately, the reliable ones are not acknowledged and a holding transaction
- * is placed in the holding area on the remote node, which contains knowledge of the deliveries.
- * The messages corresponding to the deliveries are returned to the local node
- * 3. The retrieved messages are added to the local queue in the tx
- * 4. Deliveries corresponding to the messages retrieved are acknowledged LOCALLY for the remote queue.
- * 5. The local tx is committed.
- * 6. Send "commit" message to remote node
- * 7. "Commit" message is received and deliveries in the holding transaction are acknowledged IN MEMORY only.
- * On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
- * depending on whether they exist in the database
- */
- private void pullMessages(ClusteredQueue localQueue, ClusteredQueue remoteQueue, int num) throws Throwable
- {
- Address fromAddress = (Address)nodeIdAddressMap.get(remoteQueue.getNodeId());
-
- if (fromAddress == null)
- {
- //This is ok - the node might have left the group
- return;
- }
- Transaction tx = tr.createTransaction();
-
- ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), remoteQueue.getChannelID(),
- localQueue.getName(), num);
-
- byte[] bytes = (byte[])syncSendRequest(req, fromAddress);
-
- PullMessagesResponse response = new PullMessagesResponse();
-
- StreamUtils.fromBytes(response, bytes);
-
- List msgs = response.getMessages();
-
- log.info("I have " + msgs.size() + " messages");
-
- Iterator iter = msgs.iterator();
-
- while (iter.hasNext())
- {
- org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
-
- MessageReference ref = null;
-
- try
- {
- ref = ms.reference(msg);
-
- Delivery delRet = localQueue.handle(null, ref, tx);
-
- if (delRet == null || !delRet.isSelectorAccepted())
- {
- //This should never happen
- throw new IllegalStateException("Aaarrgg queue did not accept reference");
- }
- }
- finally
- {
- if (ref != null)
- {
- ref.releaseMemoryReference();
- }
- }
-
- Delivery del = new SimpleDelivery(localQueue, ref);
-
- del.acknowledge(tx);
- }
-
- tx.commit();
-
- //TODO what if commit throws an exception - this means the commit message doesn't hit the
- //remote node so the holding transaction stays in the holding area
- //Need to catch the exception and throw a check message
- //What we need to do is catch any exceptions at the top of the call, i.e. just after the interface
- //and send a checkrequest
- //This applies to a normal message and messages requests too
-
- if (!msgs.isEmpty())
- {
- req = new PullMessagesRequest(this.nodeId, tx.getId());
-
- asyncSendRequest(req, fromAddress);
- }
- }
/*
@@ -1199,19 +1066,7 @@
controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
}
- /*
- * Unicast a sync request
- */
- private Object syncSendRequest(ClusterRequest request, Address address) throws Exception
- {
- byte[] bytes = writeRequest(request);
-
- Message message = new Message(address, null, bytes);
-
- Object result = controlMessageDispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
-
- return result;
- }
+
private void removeBindingsForAddress(Address address) throws Exception
{
@@ -1404,7 +1259,7 @@
public void receive(Message message)
{
- log.info("Received message on control channel: " + message);
+ //log.info("Received message on control channel: " + message);
}
public void setState(byte[] bytes)
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -24,6 +24,8 @@
import java.util.Iterator;
import java.util.List;
+import org.jboss.logging.Logger;
+
/**
* A DefaultMessagePullPolicy
*
@@ -37,7 +39,9 @@
*/
public class DefaultMessagePullPolicy implements MessagePullPolicy
{
-
+ private static final Logger log = Logger.getLogger(DefaultMessagePullPolicy.class);
+
+
public ClusteredQueue chooseQueue(List queues)
{
Iterator iter = queues.iterator();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -147,8 +147,6 @@
if (localQueue != null)
{
- log.info("There is a local queue");
-
//The only time the local queue won't accept is if the selector doesn't
//match - in which case it won't match at any other nodes too so no point
//in trying them
@@ -159,7 +157,6 @@
}
else
{
- log.info("No local queue!");
//There is no local shared queue
//We round robin among the rest
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -23,9 +23,12 @@
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.SimpleDelivery;
@@ -33,7 +36,11 @@
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.util.Future;
+import org.jboss.messaging.util.StreamUtils;
+import org.jgroups.Address;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -49,6 +56,8 @@
*/
public class LocalClusteredQueue extends PagingFilteredQueue implements ClusteredQueue
{
+ private static final Logger log = Logger.getLogger(LocalClusteredQueue.class);
+
private PostOfficeInternal office;
private volatile int lastCount;
@@ -58,36 +67,52 @@
private RemoteQueueStub pullQueue;
private String nodeId;
+
+ //TODO Make configurable
+ private int pullSize;
+
+ private TransactionRepository tr;
+
+ private Object pullLock = new Object();
//TODO - we shouldn't have to specify office AND nodeId
public LocalClusteredQueue(PostOffice office, String nodeId, String name, long id, MessageStore ms, PersistenceManager pm,
boolean acceptReliableMessages, boolean recoverable, QueuedExecutor executor,
- Filter filter,
+ Filter filter, TransactionRepository tr,
int fullSize, int pageSize, int downCacheSize)
{
super(name, id, ms, pm, acceptReliableMessages, recoverable, executor, filter, fullSize, pageSize, downCacheSize);
this.nodeId = nodeId;
+ this.tr = tr;
+
//FIXME - this cast is a hack
this.office = (PostOfficeInternal)office;
}
public LocalClusteredQueue(PostOffice office, String nodeId, String name, long id, MessageStore ms, PersistenceManager pm,
boolean acceptReliableMessages, boolean recoverable, QueuedExecutor executor,
- Filter filter)
+ Filter filter, TransactionRepository tr)
{
super(name, id, ms, pm, acceptReliableMessages, recoverable, executor, filter);
this.nodeId = nodeId;
+ this.tr = tr;
+
//FIXME - this cast is a hack
this.office = (PostOfficeInternal)office;
}
- public void setPullQueue(RemoteQueueStub queue)
+ public void setPullInfo(RemoteQueueStub queue, int pullSize)
{
- this.pullQueue = queue;
+ synchronized (pullLock)
+ {
+ this.pullQueue = queue;
+
+ this.pullSize = pullSize;
+ }
}
public QueueStats getStats()
@@ -100,6 +125,10 @@
lastCount = cnt;
}
+ else
+ {
+ changedSignificantly = false;
+ }
return new QueueStats(name, cnt);
}
@@ -127,6 +156,8 @@
{
List dels = new ArrayList();
+ log.info("getting " + number + " deliveries, there are " + messageRefs.size() + " available");
+
synchronized (refLock)
{
synchronized (deliveryLock)
@@ -134,21 +165,26 @@
//We only get the refs if receiversReady = false so as not to steal messages that
//might be consumed by local receivers
if (!receiversReady)
- {
+ {
+ int count = 0;
+
MessageReference ref;
- while ((ref = removeFirstInMemory()) != null)
+ while (count < number && (ref = removeFirstInMemory()) != null)
{
SimpleDelivery del = new SimpleDelivery(this, ref);
deliveries.add(del);
- dels.add(del);
+ dels.add(del);
+
+ count++;
}
return dels;
}
else
{
+ log.info("Returning an empty list since receivers are ready");
return Collections.EMPTY_LIST;
}
}
@@ -185,26 +221,33 @@
public void acknowledgeFromCluster(Delivery d) throws Throwable
{
- acknowledgeInternal(d, false);
+ acknowledgeInternal(d, null, false, false);
}
protected void deliverInternal(boolean handle) throws Throwable
{
- super.deliverInternal(handle);
+ int beforeSize = -1;
if (!handle)
{
- if (receiversReady)
+ beforeSize = messageRefs.size();
+ }
+
+ super.deliverInternal(handle);
+
+ if (!handle)
+ {
+ int afterSize = messageRefs.size();
+
+ if (receiversReady && beforeSize == 0 && afterSize == 0)
{
//Delivery has been prompted (not from handle call)
//and has run, and there are consumers that are still interested in receiving more
//refs but there are none available in the channel (either the channel is empty
//or there are only refs that don't match any selectors)
//then we should perhaps pull some messages from a remote queue
- if (pullQueue != null)
- {
- office.pullMessages(this, pullQueue);
- }
+ log.info("pulling messages");
+ pullMessages();
}
}
}
@@ -213,4 +256,124 @@
{
return true;
}
+
+ /**
+ * Pull messages from a remote queue to this queue.
+ * If any of the messages are reliable then this needs to be done reliable (i.e. without loss or redelivery)
+ * Normally this would require 2PC which would make performance suck.
+ * However since we know both queues share the same DB then we can do the persistence locally in the same
+ * tx thus avoiding 2PC and maintaining reliability:)
+ * We do the following:
+ *
+ * 1. A tx is started locally
+ * 2. Create deliveries for message(s) on the remote node - bring messages back to the local node
+ * We send a message to the remote node to retrieve a set of deliveries from the queue - it gets a max of num
+ * deliveries.
+ * The unreliable ones can be acknowledged immediately, the reliable ones are not acknowledged and a holding transaction
+ * is placed in the holding area on the remote node, which contains knowledge of the deliveries.
+ * The messages corresponding to the deliveries are returned to the local node
+ * 3. The retrieved messages are added to the local queue in the tx
+ * 4. Deliveries corresponding to the messages retrieved are acknowledged LOCALLY for the remote queue.
+ * 5. The local tx is committed.
+ * 6. Send "commit" message to remote node
+ * 7. "Commit" message is received and deliveries in the holding transaction are acknowledged IN MEMORY only.
+ * On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
+ * depending on whether they exist in the database
+ *
+ * This method will always be executed on the channel's event queue (via the deliver method)
+ * so no need to do any handles or acks inside another event message
+ */
+ private void pullMessages() throws Throwable
+ {
+ RemoteQueueStub theQueue;
+ int thePullSize;
+
+ synchronized (pullLock)
+ {
+ if (pullQueue == null)
+ {
+ return;
+ }
+ theQueue = pullQueue;
+ thePullSize = pullSize;
+ }
+
+ Address fromAddress = office.getAddressForNodeId(theQueue.getNodeId());
+
+ if (fromAddress == null)
+ {
+ //This is ok - the node might have left the group
+ return;
+ }
+
+ Transaction tx = tr.createTransaction();
+
+ ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), theQueue.getChannelID(),
+ name, thePullSize);
+
+ log.info(System.identityHashCode(this) + " Executing pull messages request for queue " + name +
+ " pulling from node " + theQueue.getNodeId() + " to node " + this.nodeId);
+
+ byte[] bytes = (byte[])office.syncSendRequest(req, fromAddress);
+
+ log.info( System.identityHashCode(this) +" Executed pull messages request");
+
+ PullMessagesResponse response = new PullMessagesResponse();
+
+ StreamUtils.fromBytes(response, bytes);
+
+ List msgs = response.getMessages();
+
+ log.info(System.identityHashCode(this) + " I retrieved " + msgs.size() + " messages");
+
+ Iterator iter = msgs.iterator();
+
+ while (iter.hasNext())
+ {
+ org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
+
+ MessageReference ref = null;
+
+ try
+ {
+ ref = ms.reference(msg);
+
+ Delivery delRet = handleInternal(null, ref, tx, true, true);
+
+ if (delRet == null || !delRet.isSelectorAccepted())
+ {
+ //This should never happen
+ throw new IllegalStateException("Aaarrgg queue did not accept reference");
+ }
+ }
+ finally
+ {
+ if (ref != null)
+ {
+ ref.releaseMemoryReference();
+ }
+ }
+
+ Delivery del = new SimpleDelivery(this, ref);
+
+ acknowledgeInternal(del, tx, true, true);
+ }
+
+ tx.commit();
+
+ //TODO what if commit throws an exception - this means the commit message doesn't hit the
+ //remote node so the holding transaction stays in the holding area
+ //Need to catch the exception and throw a check message
+ //What we need to do is catch any exceptions at the top of the call, i.e. just after the interface
+ //and send a checkrequest
+ //This applies to a normal message and messages requests too
+
+ if (!msgs.isEmpty())
+ {
+ req = new PullMessagesRequest(this.nodeId, tx.getId());
+
+ office.asyncSendRequest(req, fromAddress);
+ }
+
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -55,12 +55,14 @@
void routeFromCluster(Message message, String routingKey, Map queueNameNodeIdMap) throws Exception;
- void addToQueue(String queueName, List messages) throws Exception;
+ //void addToQueue(String queueName, List messages) throws Exception;
void asyncSendRequest(ClusterRequest request) throws Exception;
void asyncSendRequest(ClusterRequest request, Address address) throws Exception;
+ Object syncSendRequest(ClusterRequest request, Address address) throws Exception;
+
void holdTransaction(TransactionId id, ClusterTransaction tx) throws Throwable;
void commitTransaction(TransactionId id) throws Throwable;
@@ -69,12 +71,11 @@
void updateQueueStats(String nodeId, List stats) throws Exception;
- void sendStats() throws Exception;
+ void sendQueueStats() throws Exception;
boolean referenceExistsInStorage(long channelID, long messageID) throws Exception;
- List getDeliveries(String queueName, int numMessages) throws Exception;
+ List getDeliveries(String queueName, int numMessages) throws Exception;
- void pullMessages(ClusteredQueue localQueue, ClusteredQueue remoteQueue) throws Throwable;
-
+ Address getAddressForNodeId(String nodeId) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -71,17 +71,13 @@
}
Object execute(PostOfficeInternal office) throws Throwable
- {
- log.info("********* executign pull messages requiest");
-
+ {
TransactionId id = new TransactionId(nodeId, txId);
if (hold)
{
List dels = office.getDeliveries(queueName, numMessages);
- log.info("Got a list of " + dels.size() + " deliveries");
-
PullMessagesResponse response = new PullMessagesResponse(dels.size());
if (!dels.isEmpty())
@@ -114,9 +110,7 @@
//Add this to the holding area
office.holdTransaction(id, this);
}
-
- log.info("returning response:" + response);
-
+
//Convert to bytes since the response isn't serializable (nor do we want it to be)
byte[] bytes = StreamUtils.toBytes(response);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -115,9 +115,7 @@
{
//If the message is persistent and we are recoverable then we persist here, *before*
//the message is sent across the network
-
- log.info("Adding ref: " + reference + " in channel " + id);
-
+
pm.addReference(id, reference, tx);
}
catch (Exception e)
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -83,25 +83,20 @@
timer.cancel();
timer = null;
- }
+ }
-
-
class SendStatsTimerTask extends TimerTask
{
-
public void run()
{
try
{
- office.sendStats();
+ office.sendQueueStats();
}
catch (Exception e)
{
log.error("Failed to send statistics", e);
}
- }
-
+ }
}
-
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -250,13 +250,15 @@
}
}
+ private static long msgCount;
+
protected List sendMessages(String condition, boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
{
List list = new ArrayList();
for (int i = 0; i < num; i++)
{
- Message msg = CoreMessageFactory.createCoreMessage(i + 1, persistent, null);
+ Message msg = CoreMessageFactory.createCoreMessage(msgCount++, persistent, null);
MessageReference ref = ms.reference(msg);
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -103,11 +103,11 @@
//Add a couple of bindings
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 =
office1.bindClusteredQueue("topic1", queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 =
office1.bindClusteredQueue("topic1", queue2);
@@ -126,7 +126,7 @@
//Add another binding on node 2
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 =
office2.bindClusteredQueue("topic1", queue3);
@@ -153,7 +153,7 @@
//Add another binding on node 1
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding4 =
office2.bindClusteredQueue("topic1", queue4);
@@ -217,7 +217,7 @@
//Add another binding on node 3
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office3, "node3", "sub5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office3, "node3", "sub5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 =
office3.bindClusteredQueue("topic1", queue5);
@@ -253,12 +253,12 @@
//Add a durable and a non durable binding on node 1
- LocalClusteredQueue queue6 = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue6 = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding6 =
office1.bindClusteredQueue("topic1", queue6);
- LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding7 =
office1.bindClusteredQueue("topic1", queue7);
@@ -501,15 +501,15 @@
office2 = createClusteredPostOffice("node2", "testgroup");
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
try
{
@@ -520,7 +520,7 @@
{
//Ok
}
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
try
{
@@ -536,7 +536,7 @@
office2.unbindClusteredQueue("queue1");
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office1.bindClusteredQueue("queue1", queue5);
@@ -615,15 +615,15 @@
SimpleFilter filter1 = new SimpleFilter(2);
SimpleFilter filter2 = new SimpleFilter(3);
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter1);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter1, tr);
Binding binding1 =
office1.bindClusteredQueue("topic1", queue1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter2);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter2, tr);
Binding binding2 =
office2.bindClusteredQueue("topic1", queue2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, "node2", "queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, "node2", "queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 =
office2.bindClusteredQueue("topic1", queue3);
@@ -719,52 +719,52 @@
LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
Binding[] bindings = new Binding[16];
- queues[0] = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[0] = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[0] = office1.bindClusteredQueue("topic1", queues[0]);
- queues[1] = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[1] = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[1] = office1.bindClusteredQueue("topic1", queues[1]);
- queues[2] = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[2] = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[2] = office2.bindClusteredQueue("topic1", queues[2]);
- queues[3] = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[3] = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[3] = office2.bindClusteredQueue("topic1", queues[3]);
- queues[4] = new LocalClusteredQueue(office2, "node2", "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[4] = new LocalClusteredQueue(office2, "node2", "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[4] = office2.bindClusteredQueue("topic1", queues[4]);
- queues[5] = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[5] = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[5] = office1.bindClusteredQueue("topic1", queues[5]);
- queues[6] = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[6] = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[6] = office1.bindClusteredQueue("topic1", queues[6]);
- queues[7] = new LocalClusteredQueue(office1, "node1", "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[7] = new LocalClusteredQueue(office1, "node1", "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[7] = office1.bindClusteredQueue("topic1", queues[7]);
- queues[8] = new LocalClusteredQueue(office1, "node1", "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[8] = new LocalClusteredQueue(office1, "node1", "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[8] = office1.bindClusteredQueue("topic2", queues[8]);
- queues[9] = new LocalClusteredQueue(office1, "node1", "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[9] = new LocalClusteredQueue(office1, "node1", "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[9] = office1.bindClusteredQueue("topic2", queues[9]);
- queues[10] = new LocalClusteredQueue(office2, "node2", "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[10] = new LocalClusteredQueue(office2, "node2", "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[10] = office2.bindClusteredQueue("topic2", queues[10]);
- queues[11] = new LocalClusteredQueue(office2, "node2", "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[11] = new LocalClusteredQueue(office2, "node2", "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[11] = office2.bindClusteredQueue("topic2", queues[11]);
- queues[12] = new LocalClusteredQueue(office2, "node2", "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[12] = new LocalClusteredQueue(office2, "node2", "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[12] = office2.bindClusteredQueue("topic2", queues[12]);
- queues[13] = new LocalClusteredQueue(office1, "node1", "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[13] = new LocalClusteredQueue(office1, "node1", "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[13] = office1.bindClusteredQueue("topic2", queues[13]);
- queues[14] = new LocalClusteredQueue(office1, "node1", "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[14] = new LocalClusteredQueue(office1, "node1", "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[14] = office1.bindClusteredQueue("topic2", queues[14]);
- queues[15] = new LocalClusteredQueue(office1, "node1", "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[15] = new LocalClusteredQueue(office1, "node1", "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[15] = office1.bindClusteredQueue("topic2", queues[15]);
SimpleReceiver[] receivers = new SimpleReceiver[16];
@@ -899,27 +899,27 @@
//We deploy the queue on nodes 1, 2, 3, 4 and 5
//We don't deploy on node 6
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue3.add(receiver3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue4.add(receiver4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue5.add(receiver5);
@@ -1081,13 +1081,13 @@
//======
//Non durable 1 on node 2
- LocalClusteredQueue nonDurable1 = new LocalClusteredQueue(office2, "node2", "nondurable1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue nonDurable1 = new LocalClusteredQueue(office2, "node2", "nondurable1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office2.bindClusteredQueue("topic", nonDurable1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable1.add(receiver1);
//Non durable 2 on node 2
- LocalClusteredQueue nonDurable2 = new LocalClusteredQueue(office2, "node2", "nondurable2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue nonDurable2 = new LocalClusteredQueue(office2, "node2", "nondurable2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office2.bindClusteredQueue("topic", nonDurable2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable2.add(receiver2);
@@ -1096,13 +1096,13 @@
//======
//Non shared durable
- LocalClusteredQueue nonSharedDurable1 = new LocalClusteredQueue(office3, "node3", "nonshareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue nonSharedDurable1 = new LocalClusteredQueue(office3, "node3", "nonshareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 = office3.bindClusteredQueue("topic", nonSharedDurable1);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonSharedDurable1.add(receiver3);
//Non durable
- LocalClusteredQueue nonDurable3 = new LocalClusteredQueue(office3, "node3", "nondurable3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue nonDurable3 = new LocalClusteredQueue(office3, "node3", "nondurable3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding4 = office3.bindClusteredQueue("topic", nonDurable3);
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable3.add(receiver4);
@@ -1111,31 +1111,31 @@
//======
//Shared durable
- LocalClusteredQueue sharedDurable1 = new LocalClusteredQueue(office4, "node4", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue sharedDurable1 = new LocalClusteredQueue(office4, "node4", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office4.bindClusteredQueue("topic", sharedDurable1);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable1.add(receiver5);
//Non shared durable
- LocalClusteredQueue nonSharedDurable2 = new LocalClusteredQueue(office4, "node4", "nonshareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue nonSharedDurable2 = new LocalClusteredQueue(office4, "node4", "nonshareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding6 = office4.bindClusteredQueue("topic", nonSharedDurable2);
SimpleReceiver receiver6 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonSharedDurable2.add(receiver6);
//Non durable
- LocalClusteredQueue nonDurable4 = new LocalClusteredQueue(office4, "node4", "nondurable4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue nonDurable4 = new LocalClusteredQueue(office4, "node4", "nondurable4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding7 = office4.bindClusteredQueue("topic", nonDurable4);
SimpleReceiver receiver7 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable4.add(receiver7);
// Non durable
- LocalClusteredQueue nonDurable5 = new LocalClusteredQueue(office4, "node4", "nondurable5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue nonDurable5 = new LocalClusteredQueue(office4, "node4", "nondurable5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding8 = office4.bindClusteredQueue("topic", nonDurable5);
SimpleReceiver receiver8 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable5.add(receiver8);
//Non durable
- LocalClusteredQueue nonDurable6 = new LocalClusteredQueue(office4, "node4", "nondurable6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue nonDurable6 = new LocalClusteredQueue(office4, "node4", "nondurable6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding9 = office4.bindClusteredQueue("topic", nonDurable6);
SimpleReceiver receiver9 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable6.add(receiver9);
@@ -1143,32 +1143,32 @@
// Node 5
//=======
//Shared durable
- LocalClusteredQueue sharedDurable2 = new LocalClusteredQueue(office5, "node5", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue sharedDurable2 = new LocalClusteredQueue(office5, "node5", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding10 = office5.bindClusteredQueue("topic", sharedDurable2);
SimpleReceiver receiver10 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable2.add(receiver10);
//Shared durable
- LocalClusteredQueue sharedDurable3 = new LocalClusteredQueue(office5, "node5", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue sharedDurable3 = new LocalClusteredQueue(office5, "node5", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding11 = office5.bindClusteredQueue("topic", sharedDurable3);
SimpleReceiver receiver11 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable3.add(receiver11);
// Node 6
//=========
- LocalClusteredQueue sharedDurable4 = new LocalClusteredQueue(office6, "node6", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue sharedDurable4 = new LocalClusteredQueue(office6, "node6", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding12 = office6.bindClusteredQueue("topic", sharedDurable4);
SimpleReceiver receiver12 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable4.add(receiver12);
- LocalClusteredQueue nonDurable7 = new LocalClusteredQueue(office6, "node6", "nondurable7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue nonDurable7 = new LocalClusteredQueue(office6, "node6", "nondurable7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding13 = office6.bindClusteredQueue("topic", nonDurable7);
SimpleReceiver receiver13 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
nonDurable7.add(receiver13);
//Node 7
//=======
- LocalClusteredQueue sharedDurable5 = new LocalClusteredQueue(office7, "node7", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue sharedDurable5 = new LocalClusteredQueue(office7, "node7", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding14 = office7.bindClusteredQueue("topic", sharedDurable5);
SimpleReceiver receiver14 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
sharedDurable5.add(receiver14);
@@ -1502,52 +1502,52 @@
LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
Binding[] bindings = new Binding[16];
- queues[0] = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[0] = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[0] = office1.bindClusteredQueue("topic1", queues[0]);
- queues[1] = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[1] = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[1] = office1.bindClusteredQueue("topic1", queues[1]);
- queues[2] = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[2] = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[2] = office2.bindClusteredQueue("topic1", queues[2]);
- queues[3] = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[3] = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[3] = office2.bindClusteredQueue("topic1", queues[3]);
- queues[4] = new LocalClusteredQueue(office2, "node2", "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[4] = new LocalClusteredQueue(office2, "node2", "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[4] = office2.bindClusteredQueue("topic1", queues[4]);
- queues[5] = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[5] = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[5] = office1.bindClusteredQueue("topic1", queues[5]);
- queues[6] = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[6] = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[6] = office1.bindClusteredQueue("topic1", queues[6]);
- queues[7] = new LocalClusteredQueue(office1, "node1", "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[7] = new LocalClusteredQueue(office1, "node1", "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[7] = office1.bindClusteredQueue("topic1", queues[7]);
- queues[8] = new LocalClusteredQueue(office1, "node1", "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[8] = new LocalClusteredQueue(office1, "node1", "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[8] = office1.bindClusteredQueue("topic2", queues[8]);
- queues[9] = new LocalClusteredQueue(office1, "node1", "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[9] = new LocalClusteredQueue(office1, "node1", "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[9] = office1.bindClusteredQueue("topic2", queues[9]);
- queues[10] = new LocalClusteredQueue(office2, "node2", "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[10] = new LocalClusteredQueue(office2, "node2", "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[10] = office2.bindClusteredQueue("topic2", queues[10]);
- queues[11] = new LocalClusteredQueue(office2, "node2", "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[11] = new LocalClusteredQueue(office2, "node2", "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[11] = office2.bindClusteredQueue("topic2", queues[11]);
- queues[12] = new LocalClusteredQueue(office2, "node2", "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[12] = new LocalClusteredQueue(office2, "node2", "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[12] = office2.bindClusteredQueue("topic2", queues[12]);
- queues[13] = new LocalClusteredQueue(office1, "node1", "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ queues[13] = new LocalClusteredQueue(office1, "node1", "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
bindings[13] = office1.bindClusteredQueue("topic2", queues[13]);
- queues[14] = new LocalClusteredQueue(office1, "node1", "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[14] = new LocalClusteredQueue(office1, "node1", "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[14] = office1.bindClusteredQueue("topic2", queues[14]);
- queues[15] = new LocalClusteredQueue(office1, "node1", "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ queues[15] = new LocalClusteredQueue(office1, "node1", "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
bindings[15] = office1.bindClusteredQueue("topic2", queues[15]);
SimpleReceiver[] receivers = new SimpleReceiver[16];
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -123,27 +123,27 @@
office6 = createClusteredPostOffice("node6", "testgroup");
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office2.bindClusteredQueue("topic", queue1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office3.bindClusteredQueue("topic", queue2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 = office4.bindClusteredQueue("topic", queue3);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue3.add(receiver3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding4 = office5.bindClusteredQueue("topic", queue4);
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue4.add(receiver4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, "node6", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, "node6", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office6.bindClusteredQueue("topic", queue5);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue5.add(receiver5);
@@ -262,27 +262,27 @@
office6 = createClusteredPostOffice("node6", "testgroup");
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office2.bindClusteredQueue("topic", queue1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding2 = office3.bindClusteredQueue("topic", queue2);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding3 = office4.bindClusteredQueue("topic", queue3);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue3.add(receiver3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding4 = office5.bindClusteredQueue("topic", queue4);
SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue4.add(receiver4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, "node6", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, "node6", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office6.bindClusteredQueue("topic", queue5);
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue5.add(receiver5);
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -46,15 +46,15 @@
}
return
- "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;bind_addr=" + host + "):" +
- "PING(timeout=2000;num_initial_members=3):"+
- "FD(timeout=3000):"+
- "VERIFY_SUSPECT(timeout=1500):"+
- "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):"+
- "UNICAST(timeout=600,1200,2400,4800):"+
- "pbcast.STABLE(desired_avg_gossip=10000):"+
- "FRAG:"+
- "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true)";
+ "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;bind_addr=" + host + ";up_thread=false;down_thread=false):" +
+ "PING(timeout=2000;num_initial_members=3;up_thread=false;down_thread=false):"+
+ "FD(timeout=3000;up_thread=false;down_thread=false):"+
+ "VERIFY_SUSPECT(timeout=1500;up_thread=false;down_thread=false):"+
+ "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
+ "UNICAST(timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
+ "pbcast.STABLE(desired_avg_gossip=10000;up_thread=false;down_thread=false):"+
+ "FRAG(up_thread=false;down_thread=false):"+
+ "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true;up_thread=false;down_thread=false)";
}
@@ -73,15 +73,15 @@
}
return
- "UDP(mcast_addr=228.8.8.8;mcast_port=45568;ip_ttl=32;bind_addr=" + host + "):" +
- "PING(timeout=2000;num_initial_members=3):"+
- "FD(timeout=3000):"+
- "VERIFY_SUSPECT(timeout=1500):"+
- "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):"+
- "pbcast.STABLE(desired_avg_gossip=10000):"+
- "FRAG:"+
- "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true):" +
- "pbcast.STATE_TRANSFER";
+ "UDP(mcast_addr=228.8.8.8;mcast_port=45568;ip_ttl=32;bind_addr=" + host + ";up_thread=false;down_thread=false):" +
+ "PING(timeout=2000;num_initial_members=3;up_thread=false;down_thread=false):"+
+ "FD(timeout=3000;up_thread=false;down_thread=false):"+
+ "VERIFY_SUSPECT(timeout=1500;up_thread=false;down_thread=false):"+
+ "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
+ "pbcast.STABLE(desired_avg_gossip=10000;up_thread=false;down_thread=false):"+
+ "FRAG(up_thread=false;down_thread=false):"+
+ "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true;up_thread=false;down_thread=false):" +
+ "pbcast.STATE_TRANSFER(up_thread=false;down_thread=false)";
}
// Attributes ----------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-09-23 14:21:10 UTC (rev 1360)
@@ -105,46 +105,112 @@
office5 = createClusteredPostOffice("node5", "testgroup");
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ log.info("Started offices");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+
+ log.info("bound queues");
+
+ //Send 30 messages to each queue
+ this.sendMessages("queue1", persistent, office1, 30, null);
+ this.sendMessages("queue1", persistent, office2, 30, null);
+ this.sendMessages("queue1", persistent, office3, 30, null);
+ this.sendMessages("queue1", persistent, office4, 30, null);
+ this.sendMessages("queue1", persistent, office5, 30, null);
+
+ log.info("sent messages");
+
+ Thread.sleep(1000);
+
+ //Check the sizes
+
+ List msgs = queue1.browse();
+ assertEquals(30, msgs.size());
+
+ msgs = queue2.browse();
+ assertEquals(30, msgs.size());
+
+ msgs = queue3.browse();
+ assertEquals(30, msgs.size());
+
+ msgs = queue4.browse();
+ assertEquals(30, msgs.size());
+
+ msgs = queue5.browse();
+ assertEquals(30, msgs.size());
+
+ //Now we add the receivers
+ //Note that we did not do this before the send.
+ //If we had done so then it's likely that the automatic redistribution
+ //would have moved some around and there wouldn't be 30 in each queue
+
PullingReceiver receiver1 = new PullingReceiver();
queue1.add(receiver1);
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
- Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
PullingReceiver receiver2 = new PullingReceiver();
queue2.add(receiver2);
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
- Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
PullingReceiver receiver3 = new PullingReceiver();
queue3.add(receiver3);
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
- Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
PullingReceiver receiver4 = new PullingReceiver();
queue4.add(receiver4);
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
- Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
PullingReceiver receiver5 = new PullingReceiver();
queue5.add(receiver5);
+
+ log.info("Added receivers");
- //Send 30 messages to each queue
- this.sendMessages("queue1", persistent, office1, 30, null);
- this.sendMessages("queue1", persistent, office2, 30, null);
- this.sendMessages("queue1", persistent, office3, 30, null);
- this.sendMessages("queue1", persistent, office4, 30, null);
- this.sendMessages("queue1", persistent, office5, 30, null);
+ //Prompt delivery so a message pops into each receiver
+ queue1.deliver(true);
+ queue2.deliver(true);
+ queue3.deliver(true);
+ queue4.deliver(true);
+ queue5.deliver(true);
- Thread.sleep(500);
+ Thread.sleep(1000);
- List msgs = queue1.browse();
+ //Now we check the sizes again in case automatic balancing has erroneously
+ //kicked in
+
+ msgs = queue1.browse();
assertEquals(30, msgs.size());
msgs = queue2.browse();
assertEquals(30, msgs.size());
+
+ msgs = queue3.browse();
+ assertEquals(30, msgs.size());
+ msgs = queue4.browse();
+ assertEquals(30, msgs.size());
+
+ msgs = queue5.browse();
+ assertEquals(30, msgs.size());
+
+ Thread.sleep(5000);
+
+ //And again - should still be no redistribution
+
+ msgs = queue1.browse();
+ assertEquals(30, msgs.size());
+
+ msgs = queue2.browse();
+ assertEquals(30, msgs.size());
+
msgs = queue3.browse();
assertEquals(30, msgs.size());
@@ -154,24 +220,103 @@
msgs = queue5.browse();
assertEquals(30, msgs.size());
- //Consume all the messages from queue 3
- for (int i = 0; i < 30; i++)
- {
+ //Try and consumer them all via one receiver
+
+ log.info("trying to consume");
+
+ //So we have 150 messages in total - 30 on each node.
+
+ //If redistribution works ok, we should be able to do something like the following:
+
+ //Consume 10 on node 1
+
+ //Consume 50 on node 2
+
+ //Consume 75 on node 3
+
+ //Consume 10 on node 4
+
+ //Consume 5 on node 5
+
+ log.info("consuming queue1");
+ for (int i = 0; i < 10; i++)
+ {
+ queue1.deliver(true);
+ Delivery del = receiver1.getDelivery();
+ log.info("Got delivery: " + del.getReference().getMessageID());
+ del.acknowledge(null);
+ }
+ log.info("consumed queue1");
+
+ log.info("consuming queue2");
+ for (int i = 0; i < 50; i++)
+ {
+ queue2.deliver(true);
+ Delivery del = receiver2.getDelivery();
+ log.info("Got delivery: " + del.getReference().getMessageID());
+ del.acknowledge(null);
+ }
+
+ log.info("consuming queue3");
+ for (int i = 0; i < 75; i++)
+ {
+ queue3.deliver(true);
Delivery del = receiver3.getDelivery();
log.info("Got delivery: " + del.getReference().getMessageID());
- del.acknowledge(null);
- queue3.deliver(false);
+ del.acknowledge(null);
}
+ log.info("consuming queue4");
+ for (int i = 0; i < 10; i++)
+ {
+ queue4.deliver(true);
+ Delivery del = receiver4.getDelivery();
+ log.info("Got delivery: " + del.getReference().getMessageID());
+ del.acknowledge(null);
+ }
+
+ Thread.sleep(2000);
+
+ log.info("Here are the sizes:");
+
+ msgs = queue1.browse();
+ log.info("queue1: " + msgs.size());
+
+ msgs = queue2.browse();
+ log.info("queue2: " + msgs.size());
+
msgs = queue3.browse();
+ log.info("queue3: " + msgs.size());
+
+ msgs = queue4.browse();
+ log.info("queue4: " + msgs.size());
+
+ msgs = queue5.browse();
+ log.info("queue5: " + msgs.size());
+
+ log.info("consuming queue5");
+ for (int i = 0; i < 5; i++)
+ {
+ queue5.deliver(true);
+ Delivery del = receiver5.getDelivery();
+ log.info("Got delivery: " + del.getReference().getMessageID());
+ del.acknowledge(null);
+ }
+
+ msgs = queue1.browse();
assertEquals(0, msgs.size());
- queue3.deliver(false);
+ msgs = queue2.browse();
+ assertEquals(0, msgs.size());
+
+ msgs = queue3.browse();
+ assertEquals(0, msgs.size());
- Delivery del = receiver3.getDelivery();
+ msgs = queue4.browse();
+ assertEquals(0, msgs.size());
- log.info("delivery is " + del);
-
+ msgs = queue5.browse();
+ assertEquals(0, msgs.size());
}
finally
{
More information about the jboss-cvs-commits
mailing list