[jboss-cvs] JBoss Messaging SVN: r1474 - in trunk: src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster src/main/org/jboss/messaging/core/tx tests tests/src/org/jboss/test/messaging/core/paging/base tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/crash tests/src/org/jboss/test/messaging/jms/server/destination
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Oct 16 18:03:23 EDT 2006
Author: timfox
Date: 2006-10-16 18:03:03 -0400 (Mon, 16 Oct 2006)
New Revision: 1474
Added:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Removed:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
Modified:
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/tx/AckInfo.java
trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/LockMap.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionId.java
trunk/src/main/org/jboss/messaging/core/tx/Transaction.java
trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/crash/ManualCrashTest.java
trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
Log:
Interim commit for http://jira.jboss.com/jira/browse/JBMESSAGING-575
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -499,7 +499,7 @@
}
else
{
- throw new IllegalStateException("Failed to acknowledge delivery " + d);
+ throw new IllegalStateException("Could not find delivery to acknowledge");
}
}
@@ -799,7 +799,7 @@
* QueueExecutor might be share by other consumers and we don't want to wait for their
* tasks to complete
*/
- private class Waiter implements Runnable
+ private static class Waiter implements Runnable
{
Future result;
Modified: trunk/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/AckInfo.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/jms/tx/AckInfo.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -48,8 +48,7 @@
// One of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, etc.
private int ackMode;
- // The actual proxy must not get serialized
- protected transient MessageProxy msg;
+ protected MessageProxy msg;
// Static --------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -41,7 +41,7 @@
*/
public class ResourceManagerFactory
{
- public static ResourceManagerFactory instance = new ResourceManagerFactory();
+ public static final ResourceManagerFactory instance = new ResourceManagerFactory();
private Map holders;
@@ -97,7 +97,7 @@
}
}
- private class Holder
+ private static class Holder
{
ResourceManager rm = new ResourceManager();
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -168,7 +168,7 @@
}
else
{
- return handleInternal(sender, ref, tx, true, false, true);
+ return handleInternal(sender, ref, tx, true, false);
}
}
@@ -521,34 +521,6 @@
// Protected -----------------------------------------------------
- protected MessageReference nextReference(ListIterator iter, boolean handle) throws Throwable
- {
- MessageReference ref;
-
- if (iter == null)
- {
- //We just get the next ref from the head of the queue
- ref = (MessageReference) messageRefs.peekFirst();
- }
- else
- {
- // TODO This will not work with paged refs - see http://jira.jboss.com/jira/browse/JBMESSAGING-275
- // We need to extend it to work with refs from the db
-
- //We have an iterator - this means we are iterating through the queue to find a ref that matches
- if (iter.hasNext())
- {
- ref = (MessageReference)iter.next();
- }
- else
- {
- ref = null;
- }
- }
-
- return ref;
- }
-
/*
* This methods delivers as many messages as possible to the router until no
* more deliveries are returned. This method should never be called at the
@@ -556,7 +528,7 @@
*
* @see org.jboss.messaging.core.Channel#deliver()
*/
- protected void deliverInternal(boolean handle) throws Throwable
+ protected void deliverInternal() throws Throwable
{
try
{
@@ -570,7 +542,7 @@
{
synchronized (deliveryLock)
{
- ref = nextReference(iter, handle);
+ ref = nextReference(iter);
}
if (ref != null)
{
@@ -587,6 +559,8 @@
ref.incrementDeliveryCount();
Delivery del = router.handle(this, ref, null);
+
+ receiversReady = del != null;
if (del == null)
{
@@ -597,8 +571,6 @@
ref.decrementDeliveryCount();
- receiversReady = false;
-
break;
}
else if (!del.isSelectorAccepted())
@@ -619,7 +591,7 @@
if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
// Receiver accepted the reference
-
+
// We must synchronize here to cope with a race condition where message
// is cancelled/acked in flight while the following few actions are being
// performed. e.g. delivery could be cancelled acked after being removed from
@@ -666,8 +638,7 @@
}
protected Delivery handleInternal(DeliveryObserver sender, MessageReference ref,
- Transaction tx, boolean persist, boolean synchronous,
- boolean deliver)
+ Transaction tx, boolean persist, boolean synchronous)
{
if (ref == null)
{
@@ -721,10 +692,10 @@
// We only do delivery if there are receivers that haven't said they don't want
// any more references.
- if (receiversReady && deliver)
+ if (receiversReady)
{
// Prompt delivery
- deliverInternal(true);
+ deliverInternal();
}
}
else
@@ -744,7 +715,7 @@
else
{
// add to post commit callback
- getCallback(tx, synchronous, deliver).addRef(ref);
+ getCallback(tx, synchronous).addRef(ref);
if (trace) { log.trace(this + " added transactionally " + ref + " in memory"); }
}
@@ -791,7 +762,7 @@
}
else
{
- this.getCallback(tx, synchronous, false).addDelivery(d);
+ this.getCallback(tx, synchronous).addDelivery(d);
if (trace) { log.trace(this + " added " + d + " to memory on transaction " + tx); }
@@ -819,13 +790,13 @@
return removed;
}
- protected InMemoryCallback getCallback(Transaction tx, boolean synchronous, boolean deliver)
+ protected InMemoryCallback getCallback(Transaction tx, boolean synchronous)
{
InMemoryCallback callback = (InMemoryCallback) tx.getCallback(this);
if (callback == null)
{
- callback = new InMemoryCallback(synchronous, deliver);
+ callback = new InMemoryCallback(synchronous);
tx.addCallback(callback, this);
}
@@ -836,10 +807,6 @@
{
throw new IllegalStateException("Callback synchronousness status doesn't match");
}
- if (callback.isDeliver() != deliver)
- {
- throw new IllegalStateException("Callback deliver status doesn't match");
- }
}
return callback;
@@ -928,6 +895,34 @@
}
}
}
+
+ private MessageReference nextReference(ListIterator iter) throws Throwable
+ {
+ MessageReference ref;
+
+ if (iter == null)
+ {
+ //We just get the next ref from the head of the queue
+ ref = (MessageReference) messageRefs.peekFirst();
+ }
+ else
+ {
+ // TODO This will not work with paged refs - see http://jira.jboss.com/jira/browse/JBMESSAGING-275
+ // We need to extend it to work with refs from the db
+
+ //We have an iterator - this means we are iterating through the queue to find a ref that matches
+ if (iter.hasNext())
+ {
+ ref = (MessageReference)iter.next();
+ }
+ else
+ {
+ ref = null;
+ }
+ }
+
+ return ref;
+ }
// Inner classes -------------------------------------------------
@@ -938,33 +933,24 @@
private List deliveriesToRemove;
private boolean synchronous;
-
- private boolean deliver;
-
+
private boolean committing;
private Future result;
- private InMemoryCallback(boolean synchronous, boolean deliver)
+ private InMemoryCallback(boolean synchronous)
{
refsToAdd = new ArrayList();
deliveriesToRemove = new ArrayList();
this.synchronous = synchronous;
-
- this.deliver = deliver;
}
private boolean isSynchronous()
{
return synchronous;
}
-
- private boolean isDeliver()
- {
- return deliver;
- }
private void addRef(MessageReference ref)
{
@@ -1103,7 +1089,7 @@
private void doAfterCommit() throws Throwable
{
// We add the references to the state
-
+
Iterator iter = refsToAdd.iterator();
while (iter.hasNext())
@@ -1126,7 +1112,7 @@
}
// Remove deliveries
-
+
iter = this.deliveriesToRemove.iterator();
while (iter.hasNext())
@@ -1151,9 +1137,9 @@
}
//prompt delivery
- if (deliver && receiversReady)
+ if (receiversReady)
{
- deliverInternal(true);
+ deliverInternal();
}
}
@@ -1210,9 +1196,7 @@
{
if (router.numberOfReceivers() > 0)
{
- receiversReady = true;
-
- deliverInternal(false);
+ deliverInternal();
}
if (result != null)
{
@@ -1250,7 +1234,7 @@
public void run()
{
- Delivery d = handleInternal(sender, ref, null, persist, false, true);
+ Delivery d = handleInternal(sender, ref, null, persist, false);
result.setResult(d);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/LockMap.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/LockMap.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/LockMap.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -49,7 +49,7 @@
int refCount;
}
- public static LockMap instance = new LockMap();
+ public static final LockMap instance = new LockMap();
private LockMap()
{
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -21,6 +21,8 @@
*/
package org.jboss.messaging.core.plugin.contract;
+import java.util.Collection;
+
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
@@ -53,4 +55,6 @@
* @throws Throwable
*/
Binding unbindClusteredQueue(String queueName) throws Throwable;
+
+ Collection listAllBindingsForCondition(String condition) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -213,7 +213,7 @@
}
lock.writeLock().acquire();
-
+
try
{
Binding binding = removeBinding(this.nodeId, queueName);
@@ -237,49 +237,11 @@
public Collection listBindingsForCondition(String condition) throws Exception
{
- if (condition == null)
- {
- throw new IllegalArgumentException("Condition is null");
- }
-
- lock.readLock().acquire();
-
- try
- {
- //We should only list the bindings for the local node
+ return listBindingsForConditionInternal(condition, true);
+ }
+
+
- Bindings cb = (Bindings)conditionMap.get(condition);
-
- if (cb == null)
- {
- return Collections.EMPTY_LIST;
- }
- else
- {
- List list = new ArrayList();
-
- Collection bindings = cb.getAllBindings();
-
- Iterator iter = bindings.iterator();
-
- while (iter.hasNext())
- {
- Binding binding = (Binding)iter.next();
-
- if (binding.getNodeId() == this.nodeId)
- {
- list.add(binding);
- }
- }
-
- return list;
- }
- }
- finally
- {
- lock.readLock().release();
- }
- }
public Binding getBindingForQueueName(String queueName) throws Exception
{
@@ -289,7 +251,7 @@
}
lock.readLock().acquire();
-
+
try
{
Map nameMap = (Map)nameMaps.get(new Integer(this.nodeId));
@@ -402,11 +364,57 @@
}
// Protected -----------------------------------------------------
+
+ protected Collection listBindingsForConditionInternal(String condition, boolean localOnly) throws Exception
+ {
+ if (condition == null)
+ {
+ throw new IllegalArgumentException("Condition is null");
+ }
+
+ lock.readLock().acquire();
+
+ try
+ {
+ //We should only list the bindings for the local node
+
+ Bindings cb = (Bindings)conditionMap.get(condition);
+
+ if (cb == null)
+ {
+ return Collections.EMPTY_LIST;
+ }
+ else
+ {
+ List list = new ArrayList();
+
+ Collection bindings = cb.getAllBindings();
+
+ Iterator iter = bindings.iterator();
+
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ if (!localOnly || (binding.getNodeId() == this.nodeId))
+ {
+ list.add(binding);
+ }
+ }
+
+ return list;
+ }
+ }
+ finally
+ {
+ lock.readLock().release();
+ }
+ }
protected void loadBindings() throws Exception
{
lock.writeLock().acquire();
-
+
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
@@ -448,7 +456,7 @@
finally
{
lock.writeLock().release();
-
+
if (rs != null)
{
rs.close();
@@ -593,7 +601,7 @@
nameMaps.put(new Integer(binding.getNodeId()), nameMap);
}
- nameMap.put(binding.getQueue().getName(), binding);
+ nameMap.put(binding.getQueue().getName(), binding);
}
protected void addToConditionMap(Binding binding)
@@ -667,7 +675,7 @@
conditionMap.remove(binding.getCondition());
}
}
-
+
protected Map getDefaultDMLStatements()
{
Map map = new LinkedHashMap();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -54,6 +54,11 @@
request = new BindRequest();
break;
}
+ case PullMessagesResultRequest.TYPE:
+ {
+ request = new PullMessagesResultRequest();
+ break;
+ }
case MessageRequest.TYPE:
{
request = new MessageRequest();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -247,6 +247,11 @@
public synchronized void start() throws Exception
{
+ if (started)
+ {
+ log.warn("Attempt to start() but " + this + " is already started");
+ }
+
if (syncChannelConfigE != null)
{
this.syncChannel = new JChannel(syncChannelConfigE);
@@ -301,6 +306,11 @@
public synchronized void stop() throws Exception
{
+ if (!started)
+ {
+ log.warn("Attempt to stop() but " + this + " is not started");
+ }
+
super.stop();
statsSender.stop();
@@ -322,7 +332,7 @@
{
log.trace(this.nodeId + " binding clustered queue: " + queue + " with condition: " + condition);
}
-
+
if (queue.getNodeId() != this.nodeId)
{
throw new IllegalArgumentException("Queue node id does not match office node id");
@@ -345,7 +355,7 @@
{
log.trace(this.nodeId + " unbind clustered queue: " + queueName);
}
-
+
Binding binding = (Binding)super.unbindQueue(queueName);
UnbindRequest request = new UnbindRequest(nodeId, queueName);
@@ -375,7 +385,7 @@
boolean routed = false;
lock.readLock().acquire();
-
+
try
{
ClusteredBindings cb = (ClusteredBindings)conditionMap.get(condition);
@@ -533,6 +543,11 @@
return false;
}
+ public Collection listAllBindingsForCondition(String condition) throws Exception
+ {
+ return listBindingsForConditionInternal(condition, false);
+ }
+
// PostOfficeInternal implementation ------------------------------------------------------------------
/*
@@ -543,7 +558,7 @@
throws Exception
{
lock.writeLock().acquire();
-
+
if (trace)
{
log.trace(this.nodeId + " adding binding from node: " + nodeId + " queue: " + queueName + " with condition: " + condition);
@@ -614,7 +629,7 @@
public void handleAddressNodeMapping(NodeAddressInfo info, int nodeId) throws Exception
{
lock.writeLock().acquire();
-
+
if (trace)
{
log.trace(this.nodeId + " Adding address node mapping for " + info.getSyncChannelAddress() +
@@ -641,7 +656,7 @@
}
lock.readLock().acquire();
-
+
// Need to reference the message
MessageReference ref = null;
try
@@ -747,40 +762,6 @@
}
/*
- * Unicast a sync request
- */
- public Object syncSendRequest(ClusterRequest request, int nodeId, boolean ignoreNoAddress) throws Exception
- {
- if (trace) { log.trace(this.nodeId + " sending synch request to single node, request: " + request + " node " + nodeId); }
-
- Address address = this.getAddressForNodeId(nodeId, true);
-
- if (trace) { log.trace(this.nodeId + " sending to address " + address); }
-
- if (address == null)
- {
- if (ignoreNoAddress)
- {
- return null;
- }
- else
- {
- throw new IllegalArgumentException("Cannot find address for node " + nodeId);
- }
- }
-
- byte[] bytes = writeRequest(request);
-
- Message message = new Message(address, null, bytes);
-
- Object result = controlMessageDispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
-
- if (trace) { log.trace(this.nodeId + " received response: " + result); }
-
- return result;
- }
-
- /*
* We put the transaction in the holding area
*/
public void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception
@@ -798,10 +779,10 @@
if (trace) { log.trace(this.nodeId + " committing transaction " + id ); }
ClusterTransaction tx = null;
-
+
synchronized (holdingArea)
{
- tx = (ClusterTransaction)holdingArea.remove(id);
+ tx = (ClusterTransaction)holdingArea.remove(id);
}
if (tx == null)
@@ -872,7 +853,7 @@
if (trace) { log.trace(this.nodeId + " check complete"); }
}
- public synchronized void sendQueueStats() throws Exception
+ public void sendQueueStats() throws Exception
{
if (!started)
{
@@ -880,7 +861,7 @@
}
lock.readLock().acquire();
-
+
List statsList = null;
try
@@ -934,7 +915,7 @@
public void updateQueueStats(int nodeId, List statsList) throws Exception
{
lock.readLock().acquire();
-
+
if (trace) { log.trace(this.nodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
try
@@ -983,6 +964,7 @@
if (localQueue != null)
{
+ //TODO - the call to getQueues is too slow since it creates a new list and adds the local queue!!!
RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
if (trace) { log.trace(this.nodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
@@ -1013,31 +995,59 @@
{
return pm.referenceExists(channelID, messageID);
}
+
- public List getDeliveries(String queueName, int numMessages) throws Exception
+ public void handleMessagePullResult(int remoteNodeId, long holdingTxId, String queueName, List messages) throws Throwable
{
- if (trace) { log.trace(this.nodeId + " getting max " + numMessages + " deliveries for " + queueName); }
-
+ if (trace) { log.trace(this.nodeId + " handling pull result " + messages + " for " + queueName); }
+
Binding binding = getBindingForQueueName(queueName);
if (binding == null)
{
- throw new IllegalArgumentException("Cannot find binding for queue " + queueName);
+ //This might happen if the queue is unbound
+ return;
}
+
+ LocalClusteredQueue localQueue = (LocalClusteredQueue)binding.getQueue();
+
+ RemoteQueueStub remoteQueue = localQueue.getPullQueue();
- LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
+ if (remoteNodeId != remoteQueue.getNodeId())
+ {
+ //It might have changed since the request was sent
+ Map bindings = (Map)nameMaps.get(new Integer(remoteNodeId));
+
+ if (bindings != null)
+ {
+ binding = (Binding)bindings.get(queueName);
+
+ if (binding != null)
+ {
+ remoteQueue = (RemoteQueueStub)binding.getQueue();
+ }
+ }
+ }
- List dels = queue.getDeliveries(numMessages);
-
- if (trace) { log.trace(this.nodeId + " retrieved " + dels.size() + " deliveries from " + queueName); }
-
- return dels;
+ if (remoteQueue != null)
+ {
+ localQueue.handlePullMessagesResult(remoteQueue, messages, holdingTxId);
+ }
+ else
+ {
+ //TODO need to send a rollback to the remote queue otherwise will get leak on remote node
+ //in holding area
+ }
}
-
+
+ public int getNodeId()
+ {
+ return nodeId;
+ }
+
// Public ------------------------------------------------------------------------------------------
-
-
+
//Used for testing only
public void setFail(boolean beforeCommit, boolean afterCommit)
{
@@ -1081,7 +1091,7 @@
bindings.addRouter(queueName, router);
}
- router.add(binding.getQueue());
+ router.add(binding.getQueue());
}
protected void removeFromConditionMap(Binding binding)
@@ -1233,7 +1243,7 @@
private void removeBindingsForAddress(Integer nodeId) throws Exception
{
lock.writeLock().acquire();
-
+
try
{
Map nameMap = (Map)nameMaps.get(nodeId);
@@ -1379,9 +1389,9 @@
}
private Address getAddressForNodeId(int nodeId, boolean sync) throws Exception
- {
+ {
lock.readLock().acquire();
-
+
try
{
NodeAddressInfo info = (NodeAddressInfo)nodeIdAddressesMap.get(new Integer(nodeId));
@@ -1447,7 +1457,7 @@
}
public void setState(byte[] bytes)
- {
+ {
if (bytes != null)
{
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -204,12 +204,21 @@
public List getQueues()
{
- return nonLocalQueues;
+ List queues = new ArrayList();
+
+ if (localQueue != null)
+ {
+ queues.add(localQueue);
+ }
+
+ queues.addAll(nonLocalQueues);
+
+ return queues;
}
public int numberOfReceivers()
{
- return nonLocalQueues.size();
+ return nonLocalQueues.size() + (localQueue != null ? 1 : 0);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -25,7 +25,6 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.ListIterator;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
@@ -37,11 +36,8 @@
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.messaging.core.tx.TransactionRepository;
-import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.util.Future;
-import org.jboss.messaging.util.StreamUtils;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -105,6 +101,11 @@
{
this.pullQueue = queue;
}
+
+ public RemoteQueueStub getPullQueue()
+ {
+ return pullQueue;
+ }
public QueueStats getStats()
{
@@ -141,18 +142,6 @@
}
/*
- * Used when pulling messages from a remote queue
- */
- public List getDeliveries(int number) throws Exception
- {
- Future result = new Future();
-
- this.executor.execute(new GetDeliveriesRunnable(result, 1));
-
- return (List)result.getResult();
- }
-
- /*
* This is the same as the normal handle() method on the Channel except it doesn't
* persist the message even if it is persistent - this is because persistent messages
* are always persisted on the sending node before sending.
@@ -189,169 +178,30 @@
acknowledgeInternal(d, null, false, false);
}
+ public void handlePullMessagesResult(RemoteQueueStub remoteQueue, List messages, long holdingTxId) throws Exception
+ {
+ //This needs to be run on a different thread to the one used by JGroups to deliver the message
+ //to avoid deadlock
+ Runnable runnable = new MessagePullResultRunnable(remoteQueue, messages, holdingTxId);
+
+ executor.execute(runnable);
+ }
- protected MessageReference nextReference(ListIterator iter, boolean handle) throws Throwable
+ //TODO it's not ideal that we need to pass in a PullMessagesRequest
+ public void handleGetDeliveriesRequest(int returnNodeId, int number, TransactionId txId, PullMessagesRequest tx) throws Exception
{
- MessageReference ref = super.nextReference(iter, handle);
+ //This needs to be run on a different thread to the one used by JGroups to deliver the message
+ //to avoid deadlock
+ Runnable runnable = new MessagePullRequestRunnable(returnNodeId, number, txId, tx);
- if (ref == null)
- {
- //There are no available refs in the local queue
- //Maybe we need to pull one (some) from a remote queue?
-
- if (pullMessages())
- {
- ref = super.nextReference(iter, handle);
- }
- }
-
- return ref;
+ executor.execute(runnable);
}
-
+
public boolean isClustered()
{
return true;
}
-
- /**
- * Pull messages from a remote queue to this queue.
- * If any of the messages are reliable then this needs to be done reliable (i.e. without loss or redelivery)
- * Normally this would require 2PC which would make performance suck.
- * However since we know both queues share the same DB then we can do the persistence locally in the same
- * tx thus avoiding 2PC and maintaining reliability:)
- * We do the following:
- *
- * 1. A tx is started locally
- * 2. Create deliveries for message(s) on the remote node - bring messages back to the local node
- * We send a message to the remote node to retrieve a set of deliveries from the queue - it gets a max of num
- * deliveries.
- * The unreliable ones can be acknowledged immediately, the reliable ones are not acknowledged and a holding transaction
- * is placed in the holding area on the remote node, which contains knowledge of the deliveries.
- * The messages corresponding to the deliveries are returned to the local node
- * 3. The retrieved messages are added to the local queue in the tx
- * 4. Deliveries corresponding to the messages retrieved are acknowledged LOCALLY for the remote queue.
- * 5. The local tx is committed.
- * 6. Send "commit" message to remote node
- * 7. "Commit" message is received and deliveries in the holding transaction are acknowledged IN MEMORY only.
- * On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
- * depending on whether they exist in the database
- *
- * Recovery is handled in the same way as CastMessagesCallback
- *
- * This method will always be executed on the channel's event queue (via the deliver method)
- * so no need to do any handles or acks inside another event message
- */
- private boolean pullMessages() throws Throwable
- {
- if (pullQueue == null)
- {
- return false;
- }
-
- //TODO we can optimise this for the case when only one message is pulled
- //and when only non persistent messages are pulled - i.e. we don't need
- //to create a transaction.
-
- RemoteQueueStub theQueue = pullQueue;
-
- Transaction tx = tr.createTransaction();
-
- ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), theQueue.getChannelID(),
- name, 1);
-
- if (trace)
- {
- log.trace(System.identityHashCode(this) + " Executing pull messages request for queue " + name +
- " pulling from node " + theQueue.getNodeId() + " to node " + this.nodeId);
- }
-
- log.info("==================== Executing pull messages request");
- byte[] bytes = (byte[])office.syncSendRequest(req, theQueue.getNodeId(), true);
- log.info("==================== Executed pull messages request");
-
- if (bytes == null)
- {
- //Ok - node might have left the group
- return false;
- }
-
- PullMessagesResponse response = new PullMessagesResponse();
-
- StreamUtils.fromBytes(response, bytes);
-
- List msgs = response.getMessages();
-
- if (trace) { log.trace(System.identityHashCode(this) + " I retrieved " + msgs.size() + " messages from pull"); }
-
- Iterator iter = msgs.iterator();
-
- boolean containsReliable = false;
-
- while (iter.hasNext())
- {
- org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
-
- if (msg.isReliable())
- {
- //It will already have been persisted on the other node
- msg.setPersisted(true);
-
- containsReliable = true;
- }
-
- MessageReference ref = null;
-
- try
- {
- ref = ms.reference(msg);
-
- //It's ok to call this directly since this method is only ever called by the delivery thread
- //We call it with the deliver parameter set to false - this prevents delivery being done
- //after the ref is added - if delivery was done we would end up in recursion.
- Delivery delRet = handleInternal(null, ref, tx, true, true, false);
-
- if (delRet == null || !delRet.isSelectorAccepted())
- {
- //This should never happen
- throw new IllegalStateException("Queue did not accept reference!");
- }
-
- }
- finally
- {
- if (ref != null)
- {
- ref.releaseMemoryReference();
- }
- }
-
- //Acknowledge on the remote queue stub
- Delivery del = new SimpleDelivery(theQueue, ref);
-
- del.acknowledge(tx);
- }
-
- tx.commit();
-
- //TODO what if commit throws an exception - this means the commit message doesn't hit the
- //remote node so the holding transaction stays in the holding area
- //Need to catch the exception and throw a check message
- //What we need to do is catch any exceptions at the top of the call, i.e. just after the interface
- //and send a checkrequest
- //This applies to a normal message and messages requests too
-
- //We only need to send a commit message if there were reliable messages since otherwise
- //the transaction wouldn't have been added in the holding area
- if (containsReliable && isRecoverable())
- {
- req = new PullMessagesRequest(this.nodeId, tx.getId());
-
- office.asyncSendRequest(req, theQueue.getNodeId());
- }
-
- return !msgs.isEmpty();
- }
-
+
public int getRefCount()
{
//We are only interested in getting the reference count when delivery is not in progress
@@ -372,6 +222,46 @@
return ((Integer)result.getResult()).intValue();
}
+ protected void deliverInternal() throws Throwable
+ {
+ super.deliverInternal();
+
+ //If the receivers are still ready to accept more refs then we might pull messages
+ //from a remote queue
+
+ if (receiversReady && pullQueue != null)
+ {
+ //We send a message to the remote queue to pull a message - the remote queue will then send back
+ //another message asynchronously with the result.
+ //We don't do this synchronously with a message dispatcher since that can lead to distributed
+ //deadlock
+
+ sendPullMessage();
+ }
+ }
+
+ private void sendPullMessage() throws Exception
+ {
+ if (pullQueue == null)
+ {
+ //Nothing to do
+ return;
+ }
+
+ //Avoid synchronization
+ RemoteQueueStub theQueue = pullQueue;
+
+ if (theQueue == null)
+ {
+ return;
+ }
+
+ executor.execute(new SendPullRequestRunnable(theQueue));
+ }
+
+ /*
+ * Get the ref count - executed on event queue
+ */
private class GetRefCountRunnable implements Runnable
{
Future result;
@@ -389,24 +279,94 @@
}
}
- private class GetDeliveriesRunnable implements Runnable
+ /*
+ * Send a message pull request.
+ *
+ * TODO - do we really need this class?
+ * Why can't we just execute on the same thread?
+ */
+ private class SendPullRequestRunnable implements Runnable
{
- Future result;
+ private RemoteQueueStub theQueue;
+ private SendPullRequestRunnable(RemoteQueueStub theQueue)
+ {
+ this.theQueue = theQueue;
+ }
+
+ public void run()
+ {
+ try
+ {
+ //TODO
+ //We create a tx just so we get the id - we could just get the id directly from the id
+ //manager
+ Transaction tx = tr.createTransaction();
+
+ ClusterRequest req = new PullMessagesRequest(nodeId, tx.getId(), theQueue.getChannelID(),
+ name, 1);
+
+ office.asyncSendRequest(req, theQueue.getNodeId());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to pull message", e);
+ }
+ }
+
+ }
+
+ /**
+ * This is how we "pull" messages from one node to another
+ * If any of the messages are reliable then this needs to be done reliable (i.e. without loss or redelivery)
+ * Normally this would require 2PC which would make performance suck.
+ * However since we know both queues share the same DB then we can do the persistence locally in the same
+ * tx thus avoiding 2PC and maintaining reliability :)
+ * We do the following:
+ *
+ * 1. Send a PullMessagesRequest to the remote node, on receipt it will create deliveries for message(s), and
+ * possibly add a holding tx (if there are any persistent messages), the messages will then be returned in
+ * a PullMessagesResultRequest which is sent asynchronously from the remote node back to here to avoid
+ * distributed deadlock.
+ * 2. When the result is returned it hits this method.
+ * 3. The retrieved messages are added to the local queue in the tx
+ * 4. Deliveries corresponding to the messages retrieved are acknowledged LOCALLY for the remote queue.
+ * 5. The local tx is committed.
+ * 6. Send "commit" message to remote node
+ * 7. "Commit" message is received and deliveries in the holding transaction are acknowledged IN MEMORY only.
+ * On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
+ * depending on whether they exist in the database
+ *
+ * Recovery is handled in the same way as CastMessagesCallback
+ *
+ */
+
+ private class MessagePullRequestRunnable implements Runnable
+ {
+ int returnNodeId;
+
int number;
- public GetDeliveriesRunnable(Future result, int number)
- {
- this.result = result;
+ TransactionId txId;
+
+ PullMessagesRequest tx;
+
+ public MessagePullRequestRunnable(int returnNodeId, int number, TransactionId txId, PullMessagesRequest tx)
+ {
+ this.returnNodeId = returnNodeId;
this.number = number;
+
+ this.txId = txId;
+
+ this.tx = tx;
}
public void run()
{
try
{
- List list = null;
+ List dels = null;
//We only get the refs if receiversReady = false so as not to steal messages that
//might be consumed by local receivers
@@ -416,7 +376,7 @@
MessageReference ref;
- list = new ArrayList();
+ dels = new ArrayList();
synchronized (refLock)
{
@@ -428,7 +388,7 @@
deliveries.add(del);
- list.add(del);
+ dels.add(del);
count++;
}
@@ -437,75 +397,159 @@
}
else
{
- list = Collections.EMPTY_LIST;
+ dels = Collections.EMPTY_LIST;
}
- result.setResult(list);
+ if (trace) { log.trace("PullMessagesRunnable got " + dels.size() + " deliveries"); }
+
+ PullMessagesResultRequest response = new PullMessagesResultRequest(LocalClusteredQueue.this.nodeId, txId.getTxId(), name, dels.size());
+
+ List reliableDels = null;
+
+ if (!dels.isEmpty())
+ {
+ Iterator iter = dels.iterator();
+
+ Delivery del = (Delivery)iter.next();
+
+ if (del.getReference().isReliable())
+ {
+ //Add it to internal list
+ if (reliableDels == null)
+ {
+ reliableDels = new ArrayList();
+ }
+
+ reliableDels.add(del);
+ }
+ else
+ {
+ //We can ack it now
+ del.acknowledge(null);
+ }
+
+ response.addMessage(del.getReference().getMessage());
+ }
+
+ if (reliableDels != null)
+ {
+ //Add this to the holding area
+ tx.setReliableDels(reliableDels);
+ office.holdTransaction(txId, tx);
+ }
+
+ //We send the messages asynchronously to avoid a deadlock situation which can occur
+ //if we were using MessageDispatcher to get the messages.
+
+ office.asyncSendRequest(response, returnNodeId);
}
- catch (Exception e)
+ catch (Throwable e)
{
- result.setException(e);
+ log.error("Failed to get deliveries", e);
}
}
}
- private class AddReferencesCallback implements TxCallback
+ private class MessagePullResultRunnable implements Runnable
{
- private List references;
+ private RemoteQueueStub remoteQueue;
- private AddReferencesCallback(List references)
+ private List messages;
+
+ private long holdingTxId;
+
+ private MessagePullResultRunnable(RemoteQueueStub remoteQueue,
+ List messages, long holdingTxId)
{
- this.references = references;
+ this.remoteQueue = remoteQueue;
+
+ this.messages = messages;
+
+ this.holdingTxId = holdingTxId;
}
- public void afterCommit(boolean onePhase) throws Exception
+ public void run()
{
- Iterator iter = references.iterator();
-
- while (iter.hasNext())
+ try
{
- MessageReference ref = (MessageReference) iter.next();
-
- if (trace) { log.trace(this + ": adding " + ref + " to non-recoverable state"); }
-
- try
+ // TODO we should optimise for the case when only one message is pulled which is basically all
+ //we support now anyway
+ //Also we should optimise for the case when only non persistent messages are pulled
+ //in this case we don't need to create a tx.
+
+ Transaction tx = tr.createTransaction();
+
+ Iterator iter = messages.iterator();
+
+ boolean containsReliable = false;
+
+ while (iter.hasNext())
{
- synchronized (refLock)
+ org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
+
+ if (msg.isReliable())
{
- addReferenceInMemory(ref);
+ //It will already have been persisted on the other node
+ //so we need to set the persisted flag here
+ msg.setPersisted(true);
+
+ containsReliable = true;
}
+
+ MessageReference ref = null;
+
+ try
+ {
+ ref = ms.reference(msg);
+
+ //Should be executed synchronously since we already in the event queue
+ Delivery delRet = handleInternal(null, ref, tx, true, true);
+
+ if (delRet == null || !delRet.isSelectorAccepted())
+ {
+ //This should never happen
+ throw new IllegalStateException("Queue did not accept reference!");
+ }
+ }
+ finally
+ {
+ if (ref != null)
+ {
+ ref.releaseMemoryReference();
+ }
+ }
+
+ //Acknowledge on the remote queue stub
+ Delivery del = new SimpleDelivery(remoteQueue, ref);
+
+ del.acknowledge(tx);
}
- catch (Throwable t)
- {
- throw new TransactionException("Failed to add reference", t);
- }
+
+ tx.commit();
+
+ //TODO what if commit throws an exception - this means the commit message doesn't hit the
+ //remote node so the holding transaction stays in the holding area
+ //Need to catch the exception and throw a check message
+ //What we need to do is catch any exceptions at the top of the call, i.e. just after the interface
+ //and send a checkrequest
+ //This applies to a normal message and messages requests too
+
+ //We only need to send a commit message if there were reliable messages since otherwise
+ //the transaction wouldn't have been added in the holding area
+ if (containsReliable && isRecoverable())
+ {
+ ClusterRequest req = new PullMessagesRequest(nodeId, holdingTxId);
+
+ office.asyncSendRequest(req, remoteQueue.getNodeId());
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error("Failed to handle pulled message", e);
}
}
-
- public void afterPrepare() throws Exception
- {
- //NOOP
- }
-
- public void afterRollback(boolean onePhase) throws Exception
- {
- //NOOP
- }
-
- public void beforeCommit(boolean onePhase) throws Exception
- {
- //NOOP
- }
-
- public void beforePrepare() throws Exception
- {
- //NOOP
- }
-
- public void beforeRollback(boolean onePhase) throws Exception
- {
- //NOOP
- }
}
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -58,8 +58,6 @@
void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception;
- Object syncSendRequest(ClusterRequest request, int nodeId, boolean ignoreNoAddress) throws Exception;
-
void holdTransaction(TransactionId id, ClusterTransaction tx) throws Throwable;
void commitTransaction(TransactionId id) throws Throwable;
@@ -69,4 +67,6 @@
void sendQueueStats() throws Exception;
boolean referenceExistsInStorage(long channelID, long messageID) throws Exception;
+
+ void handleMessagePullResult(int remoteNodeId, long holdingTxId, String queueName, List messages) throws Throwable;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -23,14 +23,12 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.plugin.postoffice.Binding;
-import org.jboss.messaging.util.StreamUtils;
/**
* A PullMessagesRequest
@@ -90,47 +88,9 @@
LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
- List dels = queue.getDeliveries(numMessages);
-
- if (trace) { log.trace("PullMessagesRequest got " + dels.size() + " deliveries"); }
-
- PullMessagesResponse response = new PullMessagesResponse(dels.size());
-
- if (!dels.isEmpty())
- {
- Iterator iter = dels.iterator();
-
- Delivery del = (Delivery)iter.next();
-
- if (del.getReference().isReliable())
- {
- //Add it to internal list
- if (reliableDels == null)
- {
- reliableDels = new ArrayList();
- }
-
- reliableDels.add(del);
- }
- else
- {
- //We can ack it now
- del.acknowledge(null);
- }
-
- response.addMessage(del.getReference().getMessage());
- }
-
- if (reliableDels != null)
- {
- //Add this to the holding area
- office.holdTransaction(id, this);
- }
+ queue.handleGetDeliveriesRequest(nodeId, numMessages, id, this);
- //Convert to bytes since the response isn't serializable (nor do we want it to be)
- byte[] bytes = StreamUtils.toBytes(response);
-
- return bytes;
+ return null;
}
else
{
@@ -139,6 +99,12 @@
return null;
}
}
+
+ //TODO this is a bit messsy - must be a nicer way of setting this
+ void setReliableDels(List reliableDels)
+ {
+ this.reliableDels = reliableDels;
+ }
byte getType()
{
Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -1,99 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.message.MessageFactory;
-import org.jboss.messaging.util.Streamable;
-
-/**
- * A PullMessagesResponse
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class PullMessagesResponse implements Streamable
-{
- private List messages;
-
- PullMessagesResponse()
- {
- }
-
- PullMessagesResponse(int size)
- {
- messages = new ArrayList(size);
- }
-
- void addMessage(Message msg)
- {
- messages.add(msg);
- }
-
- List getMessages()
- {
- return messages;
- }
-
- public void read(DataInputStream in) throws Exception
- {
- int num = in.readInt();
-
- messages = new ArrayList(num);
-
- for (int i = 0; i < num; i++)
- {
- byte type = in.readByte();
-
- Message msg = MessageFactory.createMessage(type);
-
- msg.read(in);
-
- messages.add(msg);
- }
- }
-
- public void write(DataOutputStream out) throws Exception
- {
- out.writeInt(messages.size());
-
- Iterator iter = messages.iterator();
-
- while (iter.hasNext())
- {
- Message msg = (Message)iter.next();
-
- out.writeByte(msg.getType());
-
- msg.write(out);
- }
- }
-}
Copied: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java (from rev 1473, trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -0,0 +1,137 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.message.MessageFactory;
+
+/**
+ *
+ * A PullMessagesResultRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class PullMessagesResultRequest extends ClusterRequest
+{
+ public static final int TYPE = 2;
+
+ private long holdingTxId;
+
+ private String queueName;
+
+ private List messages;
+
+ private int remoteNodeId;
+
+ PullMessagesResultRequest()
+ {
+ }
+
+ PullMessagesResultRequest(int remoteNodeId, long holdingTxId, String queueName, int size)
+ {
+ this.remoteNodeId = remoteNodeId;
+
+ this.holdingTxId = holdingTxId;
+
+ this.queueName = queueName;
+
+ messages = new ArrayList(size);
+ }
+
+ void addMessage(Message msg)
+ {
+ messages.add(msg);
+ }
+
+ List getMessages()
+ {
+ return messages;
+ }
+
+ public void read(DataInputStream in) throws Exception
+ {
+ remoteNodeId = in.readInt();
+
+ holdingTxId = in.readLong();
+
+ queueName = in.readUTF();
+
+ int num = in.readInt();
+
+ messages = new ArrayList(num);
+
+ for (int i = 0; i < num; i++)
+ {
+ byte type = in.readByte();
+
+ Message msg = MessageFactory.createMessage(type);
+
+ msg.read(in);
+
+ messages.add(msg);
+ }
+ }
+
+ public void write(DataOutputStream out) throws Exception
+ {
+ out.writeInt(remoteNodeId);
+
+ out.writeLong(holdingTxId);
+
+ out.writeUTF(queueName);
+
+ out.writeInt(messages.size());
+
+ Iterator iter = messages.iterator();
+
+ while (iter.hasNext())
+ {
+ Message msg = (Message)iter.next();
+
+ out.writeByte(msg.getType());
+
+ msg.write(out);
+ }
+ }
+
+ Object execute(PostOfficeInternal office) throws Throwable
+ {
+ office.handleMessagePullResult(remoteNodeId, holdingTxId, queueName, messages);
+
+ return null;
+ }
+
+ byte getType()
+ {
+ return TYPE;
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionId.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionId.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionId.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -87,7 +87,7 @@
TransactionId tother = (TransactionId)other;
- return tother.txId == this.txId && tother.nodeId == this.nodeId;
+ return (tother.txId == this.txId) && (tother.nodeId == this.nodeId);
}
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/tx/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/tx/Transaction.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/tx/Transaction.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -208,7 +208,7 @@
if (trace) { log.trace("executing after commit hooks " + this); }
if (firstCallback != null)
- {
+ {
firstCallback.afterCommit(onePhase);
}
Modified: trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -179,7 +179,7 @@
public Transaction createTransaction() throws Exception
{
Transaction tx = new Transaction(idManager.getId());
-
+
if (trace) { log.trace("created transaction " + tx); }
return tx;
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/build.xml 2006-10-16 22:03:03 UTC (rev 1474)
@@ -91,7 +91,7 @@
<property name="junit.haltonfailure" value="false"/>
<property name="junit.fork" value="true"/>
<property name="junit.includeantruntime" value="true"/>
- <property name="junit.timeout" value="300000"/>
+ <property name="junit.timeout" value="1200000"/>
<property name="stress.timeout" value="4800000"/>
<property name="junit.showoutput" value="true"/>
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -476,9 +476,7 @@
{
long msgId = rs.getLong(1);
long pageOrd = rs.getLong(3);
-
- //log.info("Exists " + msgId + " with page ord " + pageOrd);
-
+
msgIds.add(new Long(msgId));
}
rs.close();
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -99,12 +99,11 @@
//Add a couple of bindings
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
-
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 =
office1.bindClusteredQueue("topic1", queue1);
+
LocalClusteredQueue queue2 = new LocalClusteredQueue(office1, 1, "sub2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
-
Binding binding2 =
office1.bindClusteredQueue("topic1", queue2);
@@ -112,7 +111,7 @@
office2 = createClusteredPostOffice(2, "testgroup");
- Collection bindings = office2.listBindingsForCondition("topic1");
+ Collection bindings = office2.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(2, bindings.size());
@@ -129,7 +128,7 @@
//Make sure both nodes pick it up
- bindings = office1.listBindingsForCondition("topic1");
+ bindings = office1.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(3, bindings.size());
@@ -138,7 +137,7 @@
assertEquivalent(binding2, (Binding)iter.next());
assertEquivalent(binding3, (Binding)iter.next());
- bindings = office2.listBindingsForCondition("topic1");
+ bindings = office2.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(3, bindings.size());
@@ -155,7 +154,7 @@
// Make sure both nodes pick it up
- bindings = office1.listBindingsForCondition("topic1");
+ bindings = office1.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(4, bindings.size());
@@ -165,7 +164,7 @@
assertEquivalent(binding3, (Binding)iter.next());
assertEquivalent(binding4, (Binding)iter.next());
- bindings = office2.listBindingsForCondition("topic1");
+ bindings = office2.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(4, bindings.size());
@@ -181,7 +180,7 @@
//Make sure bindings are not longer available on either node
- bindings = office1.listBindingsForCondition("topic1");
+ bindings = office1.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(2, bindings.size());
@@ -189,7 +188,7 @@
assertEquivalent(binding3, (Binding)iter.next());
assertEquivalent(binding4, (Binding)iter.next());
- bindings = office2.listBindingsForCondition("topic1");
+ bindings = office2.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(2, bindings.size());
@@ -203,7 +202,7 @@
//Maks sure it picks up the bindings
- bindings = office3.listBindingsForCondition("topic1");
+ bindings = office3.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(2, bindings.size());
@@ -220,7 +219,7 @@
// Make sure all nodes pick it up
- bindings = office1.listBindingsForCondition("topic1");
+ bindings = office1.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(3, bindings.size());
@@ -229,7 +228,7 @@
assertEquivalent(binding4, (Binding)iter.next());
assertEquivalent(binding5, (Binding)iter.next());
- bindings = office2.listBindingsForCondition("topic1");
+ bindings = office2.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(3, bindings.size());
@@ -238,7 +237,7 @@
assertEquivalent(binding4, (Binding)iter.next());
assertEquivalent(binding5, (Binding)iter.next());
- bindings = office3.listBindingsForCondition("topic1");
+ bindings = office3.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(3, bindings.size());
@@ -261,7 +260,7 @@
// Make sure all nodes pick them up
- bindings = office1.listBindingsForCondition("topic1");
+ bindings = office1.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(5, bindings.size());
@@ -272,7 +271,7 @@
assertEquivalent(binding6, (Binding)iter.next());
assertEquivalent(binding7, (Binding)iter.next());
- bindings = office2.listBindingsForCondition("topic1");
+ bindings = office2.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(5, bindings.size());
@@ -283,7 +282,7 @@
assertEquivalent(binding6, (Binding)iter.next());
assertEquivalent(binding7, (Binding)iter.next());
- bindings = office3.listBindingsForCondition("topic1");
+ bindings = office3.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(5, bindings.size());
@@ -305,7 +304,7 @@
//All it's non durable bindings should be removed from the other nodes
//Durable bindings should remain
- bindings = office2.listBindingsForCondition("topic1");
+ bindings = office2.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(4, bindings.size());
@@ -315,7 +314,7 @@
assertEquivalent(binding5, (Binding)iter.next());
assertEquivalent(binding6, (Binding)iter.next());
- bindings = office3.listBindingsForCondition("topic1");
+ bindings = office3.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(4, bindings.size());
@@ -328,7 +327,7 @@
//Stop office 2
office2.stop();
- bindings = office3.listBindingsForCondition("topic1");
+ bindings = office3.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(2, bindings.size());
@@ -341,7 +340,7 @@
office2 = createClusteredPostOffice(2, "testgroup");
- bindings = office1.listBindingsForCondition("topic1");
+ bindings = office1.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(2, bindings.size());
@@ -349,7 +348,7 @@
assertEquivalent(binding5, (Binding)iter.next());
assertEquivalent(binding6, (Binding)iter.next());
- bindings = office2.listBindingsForCondition("topic1");
+ bindings = office2.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(2, bindings.size());
@@ -357,7 +356,7 @@
assertEquivalent(binding5, (Binding)iter.next());
assertEquivalent(binding6, (Binding)iter.next());
- bindings = office3.listBindingsForCondition("topic1");
+ bindings = office3.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(2, bindings.size());
@@ -378,21 +377,21 @@
//Only the durable queue should survive
- bindings = office1.listBindingsForCondition("topic1");
+ bindings = office1.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(1, bindings.size());
iter = bindings.iterator();
assertEquivalent(binding6, (Binding)iter.next());
- bindings = office2.listBindingsForCondition("topic1");
+ bindings = office2.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(1, bindings.size());
iter = bindings.iterator();
assertEquivalent(binding6, (Binding)iter.next());
- bindings = office3.listBindingsForCondition("topic1");
+ bindings = office3.listAllBindingsForCondition("topic1");
assertNotNull(bindings);
assertEquals(1, bindings.size());
@@ -540,11 +539,11 @@
LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-
+
LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
-
+
Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
-
+
LocalClusteredQueue queue3 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
try
@@ -568,10 +567,10 @@
//Ok
}
- office1.unbindClusteredQueue("queue1");
-
office2.unbindClusteredQueue("queue1");
-
+
+ office1.unbindClusteredQueue("queue1");
+
LocalClusteredQueue queue5 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
Binding binding5 = office1.bindClusteredQueue("queue1", queue5);
@@ -1319,7 +1318,7 @@
checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
//n7
- checkEmpty(receiver12);
+ checkEmpty(receiver14);
//Send 1 message at node2
@@ -1351,7 +1350,7 @@
checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
//n7
- checkEmpty(receiver12);
+ checkEmpty(receiver14);
//Send 1 message at node3
//========================
@@ -1382,12 +1381,12 @@
checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
//n7
- checkEmpty(receiver12);
+ checkEmpty(receiver14);
//Send 1 message at node4
//========================
-
- msgs = sendMessages("topic", persistent, office4, 1, null);
+
+ msgs = sendMessages("topic", persistent, office4, 1, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1398,22 +1397,22 @@
checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
//n4
- checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1); // shared durable 1
checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
//n5
- checkEmpty(receiver10);
- checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+ checkEmpty(receiver10); //shared durable 1
+ checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3); //shared durable 2
//n6
- checkEmpty(receiver12);
- checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+ checkEmpty(receiver12); // shared durable 2
+ checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
//n7
- checkEmpty(receiver12);
+ checkEmpty(receiver14);
//Send 1 message at node5
//========================
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -87,6 +87,45 @@
super.tearDown();
}
+ public void testSize() throws Exception
+ {
+ DefaultRouter dr = new DefaultRouter();
+
+ ClusteredQueue queue1 = new SimpleQueue(true);
+ dr.add(queue1);
+
+ assertEquals(1, dr.numberOfReceivers());
+ assertEquals(1, dr.getQueues().size());
+
+ ClusteredQueue queue2 = new SimpleQueue(false);
+ dr.add(queue2);
+
+ assertEquals(2, dr.numberOfReceivers());
+ assertEquals(2, dr.getQueues().size());
+
+ ClusteredQueue queue3 = new SimpleQueue(false);
+ dr.add(queue3);
+
+ assertEquals(3, dr.numberOfReceivers());
+ assertEquals(3, dr.getQueues().size());
+
+ dr.remove(queue3);
+
+ assertEquals(2, dr.numberOfReceivers());
+ assertEquals(2, dr.getQueues().size());
+
+ dr.remove(queue2);
+
+ assertEquals(1, dr.numberOfReceivers());
+ assertEquals(1, dr.getQueues().size());
+
+ dr.remove(queue1);
+
+ assertEquals(0, dr.numberOfReceivers());
+ assertTrue(dr.getQueues().isEmpty());
+
+ }
+
// The router only has a local queue
public void testRouterOnlyLocal() throws Exception
{
Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -0,0 +1,936 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.Channel;
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
+
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+
+public class RedistributionTest extends ClusteringTestBase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public RedistributionTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+
+ public void testConsumeAllNonPersistentNonRecoverable() throws Throwable
+ {
+ consumeAll(false, false);
+ }
+
+ public void testConsumeAllPersistentNonRecoverable() throws Throwable
+ {
+ consumeAll(true, false);
+ }
+
+ public void testConsumeAllNonPersistentRecoverable() throws Throwable
+ {
+ consumeAll(false, true);
+ }
+
+ public void testConsumeAllPersistentRecoverable() throws Throwable
+ {
+ consumeAll(true, true);
+ }
+
+
+
+ public void testConsumeBitByBitNonPersistentNonRecoverable() throws Throwable
+ {
+ consumeBitByBit(false, false);
+ }
+
+ public void testConsumeBitByBitPersistentNonRecoverable() throws Throwable
+ {
+ consumeBitByBit(true, false);
+ }
+
+ public void testConsumeBitByBitNonPersistentRecoverable() throws Throwable
+ {
+ consumeBitByBit(false, true);
+ }
+
+ public void testConsumeBitByBitPersistentRecoverable() throws Throwable
+ {
+ consumeBitByBit(true, true);
+ }
+
+
+
+
+//
+// public void testConsumeConcurrentlyNonPersistentNonRecoverable() throws Throwable
+// {
+// consumeConcurrently(false, false);
+// }
+//
+// public void testConsumeConsumeConcurrentlyPersistentNonRecoverable() throws Throwable
+// {
+// consumeConcurrently(true, false);
+// }
+//
+// public void testConsumeConsumeConcurrentlyNonPersistentRecoverable() throws Throwable
+// {
+// consumeConcurrently(false, true);
+// }
+//
+// public void testConsumeConsumeConcurrentlyPersistentRecoverable() throws Throwable
+// {
+// consumeConcurrently(true, true);
+// }
+
+ protected void consumeAll(boolean persistent, boolean recoverable) throws Throwable
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ DefaultClusteredPostOffice office3 = null;
+
+ DefaultClusteredPostOffice office4 = null;
+
+ DefaultClusteredPostOffice office5 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+
+ office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
+
+ office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+
+ final int NUM_MESSAGES = 100;
+
+ this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
+
+ Thread.sleep(2000);
+
+ //Check the sizes
+
+ log.info("Here are the sizes:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ SimpleReceiver receiver = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ queue1.add(receiver);
+
+ queue1.deliver(false);
+
+ Thread.sleep(7000);
+
+ log.info("Here are the sizes:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(0, queue1.memoryRefCount());
+ assertEquals(NUM_MESSAGES * 5, queue1.memoryDeliveryCount());
+
+ assertEquals(0, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(0, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(0, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(0, queue5.memoryRefCount());
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ List messages = receiver.getMessages();
+
+ assertNotNull(messages);
+
+ assertEquals(NUM_MESSAGES * 5, messages.size());
+
+ Iterator iter = messages.iterator();
+
+ while (iter.hasNext())
+ {
+ Message msg = (Message)iter.next();
+
+ receiver.acknowledge(msg, null);
+ }
+
+ receiver.clear();
+
+ assertEquals(0, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ checkNoMessageData();
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+ }
+ }
+
+ protected void consumeBitByBit(boolean persistent, boolean recoverable) throws Throwable
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ DefaultClusteredPostOffice office3 = null;
+
+ DefaultClusteredPostOffice office4 = null;
+
+ DefaultClusteredPostOffice office5 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+
+ office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
+
+ office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+
+ final int NUM_MESSAGES = 100;
+
+ this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
+
+ Thread.sleep(2000);
+
+ //Check the sizes
+
+ log.info("Here are the sizes 1:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ queue2.add(receiver2);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ queue3.add(receiver3);
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ queue4.add(receiver4);
+ SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ queue5.add(receiver5);
+
+ receiver1.setMaxRefs(5);
+ queue1.deliver(false);
+ Thread.sleep(1000);
+ assertEquals(NUM_MESSAGES - 5, queue1.memoryRefCount());
+ assertEquals(5, queue1.memoryDeliveryCount());
+
+ acknowledgeAll(receiver1);
+ assertEquals(0, queue1.memoryDeliveryCount());
+ receiver1.setMaxRefs(0);
+
+ receiver2.setMaxRefs(10);
+ queue2.deliver(false);
+ Thread.sleep(1000);
+ assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
+ assertEquals(10, queue2.memoryDeliveryCount());
+ acknowledgeAll(receiver2);
+ receiver2.setMaxRefs(0);
+
+ receiver3.setMaxRefs(15);
+ queue3.deliver(false);
+ Thread.sleep(1000);
+ assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+ assertEquals(15, queue3.memoryDeliveryCount());
+ acknowledgeAll(receiver3);
+ receiver3.setMaxRefs(0);
+
+ receiver4.setMaxRefs(20);
+ queue4.deliver(false);
+ Thread.sleep(1000);
+ assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+ assertEquals(20, queue4.memoryDeliveryCount());
+ acknowledgeAll(receiver4);
+ receiver4.setMaxRefs(0);
+
+ receiver5.setMaxRefs(25);
+ queue5.deliver(false);
+ Thread.sleep(1000);
+ assertEquals(NUM_MESSAGES - 25, queue5.memoryRefCount());
+ assertEquals(25, queue5.memoryDeliveryCount());
+ acknowledgeAll(receiver5);
+ receiver5.setMaxRefs(0);
+
+ Thread.sleep(1000);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ log.info("Here are the sizes 2:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ //Consume the rest from queue 5
+ receiver5.setMaxRefs(NUM_MESSAGES - 25);
+ queue5.deliver(false);
+ Thread.sleep(5000);
+
+ log.info("receiver5 msgs:" + receiver5.getMessages().size());
+
+ log.info("Here are the sizes 3:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ //This will result in an extra one being pulled from queue1 - we cannot avoid this
+ //This is because the channel does not know that the receiver is full unless it tries
+ //with a ref so it needs to retrieve one
+
+ assertEquals(NUM_MESSAGES - 6, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(1, queue5.memoryRefCount());
+ assertEquals(NUM_MESSAGES - 25, queue5.memoryDeliveryCount());
+
+ acknowledgeAll(receiver5);
+
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ receiver5.setMaxRefs(0);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ //Now consume 5 more from queue5, they should come from queue1 which has the most messages
+
+ log.info("Consume 5 more from queue 5");
+
+ receiver5.setMaxRefs(5);
+ queue5.deliver(false);
+ Thread.sleep(3000);
+
+ log.info("Here are the sizes 4:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
+
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(1, queue5.memoryRefCount());
+ assertEquals(5, queue5.memoryDeliveryCount());
+
+ acknowledgeAll(receiver5);
+
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ receiver1.setMaxRefs(0);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ //Consume 1 more - should pull one from queue2
+
+ receiver5.setMaxRefs(1);
+ queue5.deliver(false);
+ Thread.sleep(2000);
+
+ log.info("Here are the sizes 5:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 11, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(1, queue5.memoryRefCount());
+ assertEquals(1, queue5.memoryDeliveryCount());
+
+ acknowledgeAll(receiver5);
+
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ receiver5.setMaxRefs(0);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ //From queue 4 consume everything else
+
+ receiver4.setMaxRefs(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1);
+ queue4.deliver(false);
+ Thread.sleep(7000);
+
+ log.info("Here are the sizes 6:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(0, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(0, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(0, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(0, queue4.memoryRefCount());
+ assertEquals(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1, queue4.memoryDeliveryCount());
+
+ assertEquals(0, queue5.memoryRefCount());
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ acknowledgeAll(receiver4);
+
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ checkNoMessageData();
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+ }
+ }
+
+ protected void consumeConcurrently(boolean persistent, boolean recoverable) throws Throwable
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ DefaultClusteredPostOffice office3 = null;
+
+ DefaultClusteredPostOffice office4 = null;
+
+ DefaultClusteredPostOffice office5 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+
+ office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
+
+ office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+
+ //Test with no consumers on queue1
+
+ //Two equal consumers on queue2 and queue3
+
+ //Add messages at queue 1
+
+ final int NUM_MESSAGES = 10000;
+
+ this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
+
+ log.info("sent messages");
+
+ Thread.sleep(4000);
+
+ ThrottleReceiver receiver1 = new ThrottleReceiver(queue1, 0, 50);
+ queue1.add(receiver1);
+ queue1.deliver(false);
+
+ ThrottleReceiver receiver2 = new ThrottleReceiver(queue2, 0, 50);
+ queue2.add(receiver2);
+ queue2.deliver(false);
+
+ Thread.sleep(45000);
+
+ log.info("receiver1: " + receiver1.getTotalCount());
+
+ log.info("receiver2: " + receiver2.getTotalCount());
+
+
+ //test1
+
+
+ //No consumer on node 1
+ //Very slow consumer on node 2
+ //
+
+ /*
+ * Test with very fast, infinitely big consumer (i.e. is always ready) on node 1
+ * Fast consumer on node2
+ * Send messages on node 1
+ * Verify all go to node1 consumer
+ *
+ * Test with very fast, not infinitely big consumer (i.e. is not always ready) on node 1
+ * Fast consumer on node2
+ * Send messages on node 1
+ * Verify most go to node1 consumer, some go to node 2
+ *
+ * Test with slow consumer on node 1, Fast consumer on node 2
+ *
+ * Test with no consumer on node 1, consumers on other nodes
+ *
+ * Things up all the other permutations, then take a guess with error margin of
+ * how many messages should be on each node.
+ */
+
+
+ checkNoMessageData();
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+ }
+ }
+
+ class ThrottleReceiver implements Receiver, Runnable
+ {
+ long pause;
+
+ volatile int totalCount;
+
+ int count;
+
+ int maxSize;
+
+ volatile boolean full;
+
+ Executor executor;
+
+ List dels;
+
+ Channel queue;
+
+ int getTotalCount()
+ {
+ return totalCount;
+ }
+
+ ThrottleReceiver(Channel queue, long pause, int maxSize)
+ {
+ this.queue = queue;
+
+ this.pause = pause;
+
+ this.maxSize = maxSize;
+
+ this.executor = new QueuedExecutor();
+
+ this.dels = new ArrayList();
+ }
+
+ public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+ {
+ if (full)
+ {
+ return null;
+ }
+
+ //log.info(this + " got ref");
+
+ //log.info("cnt:" + totalCount);
+
+ SimpleDelivery del = new SimpleDelivery(observer, reference);
+
+ dels.add(del);
+
+ count++;
+
+ totalCount++;
+
+ if (count == maxSize)
+ {
+ full = true;
+
+ count = 0;
+
+ try
+ {
+ executor.execute(this);
+ }
+ catch (InterruptedException e)
+ {
+ //Ignore
+ }
+ }
+
+ return del;
+
+ }
+
+ public void run()
+ {
+ //Simulate processing of messages
+
+ try
+ {
+ Thread.sleep(pause);
+ }
+ catch (InterruptedException e)
+ {
+ //Ignore
+ }
+
+ Iterator iter = dels.iterator();
+
+ while (iter.hasNext())
+ {
+ Delivery del = (Delivery)iter.next();
+
+ try
+ {
+ del.acknowledge(null);
+ }
+ catch (Throwable t)
+ {
+ //Ignore
+ }
+ }
+
+ dels.clear();
+
+ full = false;
+
+ queue.deliver(false);
+ }
+
+ }
+
+ private void acknowledgeAll(SimpleReceiver receiver) throws Throwable
+ {
+ List messages = receiver.getMessages();
+
+ Iterator iter = messages.iterator();
+
+ while (iter.hasNext())
+ {
+ Message msg = (Message)iter.next();
+
+ receiver.acknowledge(msg, null);
+ }
+
+ receiver.clear();
+ }
+
+
+ protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
+ {
+ MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
+
+ FilterFactory ff = new SimpleFilterFactory();
+
+ ClusterRouterFactory rf = new DefaultRouterFactory();
+
+ DefaultClusteredPostOffice postOffice =
+ new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+ null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+ groupName,
+ JGroupsUtil.getControlStackProperties(),
+ JGroupsUtil.getDataStackProperties(),
+ 10000, 10000, pullPolicy, rf, 1, 1000);
+
+ postOffice.start();
+
+ return postOffice;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
+
+
+
+
Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -1786,12 +1786,12 @@
{
try
{
- log.info("(ThreadCloser)Waiting on monitor to close thread");
+ log.trace("(ThreadCloser)Waiting on monitor to close thread");
synchronized (waitMonitor)
{
waitMonitor.wait();
}
- log.info("(ThreadCloser)Notification received");
+ log.trace("(ThreadCloser)Notification received");
Thread.sleep(timeToSleep);
topicConsumer.close();
@@ -1824,12 +1824,12 @@
{
try
{
- log.info("(ThreadReceiver)Waiting on monitor to close thread");
+ log.trace("(ThreadReceiver)Waiting on monitor to close thread");
synchronized(waitMonitor)
{
waitMonitor.wait();
}
- log.info("(ThreadReceiver)Notification received");
+ log.trace("(ThreadReceiver)Notification received");
t1=System.currentTimeMillis();
receivedObject=topicConsumer.receive(timeToWait);
t2=System.currentTimeMillis();
@@ -1868,7 +1868,7 @@
assertNull(receiver.receivedObject);
- log.info("Elapsed time was " + (receiver.t2-receiver.t1));
+ log.trace("Elapsed time was " + (receiver.t2-receiver.t1));
// We need to make sure the
assertTrue("Receive was supposed to receive a notification before 2 seconds",receiver.t2-receiver.t1<=1500);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ManualCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ManualCrashTest.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ManualCrashTest.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -173,8 +173,6 @@
assertEquals(count, c);
- log.info("Received message:" + count);
-
count++;
if (count == i)
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java 2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java 2006-10-16 22:03:03 UTC (rev 1474)
@@ -229,7 +229,6 @@
public void testMessageCountOverFullSize() throws Exception
{
- log.info("** starting testMessageCountOverFullSize");
InitialContext ic = new InitialContext(ServerManagement.getJNDIEnvironment());
ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
Connection conn = null;
@@ -271,13 +270,10 @@
int receivedCount = 0;
- log.info("Starting receiver loop...");
-
while((cons.receive(2000)) != null)
{
receivedCount++;
- log.info(receivedCount + " messages received");
Thread.sleep(500);
int mc = ((Integer)ServerManagement.
getAttribute(destObjectName, "MessageCount")).intValue();
@@ -296,7 +292,6 @@
}
finally
{
- log.info("** leaving testMessageCountOverFullSize");
ServerManagement.undeployQueue("QueueMessageCount2");
if (conn != null)
More information about the jboss-cvs-commits
mailing list