[jboss-cvs] JBoss Messaging SVN: r1487 - in trunk: docs/clustering/en/modules src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 17 13:51:54 EDT 2006
Author: timfox
Date: 2006-10-17 13:51:41 -0400 (Tue, 17 Oct 2006)
New Revision: 1487
Added:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessageResultRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RollbackPullRequest.java
Removed:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java
Modified:
trunk/docs/clustering/en/modules/configuration.xml
trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-575
Modified: trunk/docs/clustering/en/modules/configuration.xml
===================================================================
--- trunk/docs/clustering/en/modules/configuration.xml 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/docs/clustering/en/modules/configuration.xml 2006-10-17 17:51:41 UTC (rev 1487)
@@ -78,7 +78,6 @@
<attribute name="GroupName">Topic</attribute>
<attribute name="StateTimeout">5000</attribute>
<attribute name="CastTimeout">5000</attribute>
- <attribute name="PullSize">1</attribute>
<attribute name="StatsSendPeriod">10000</attribute>
<attribute name="MessagePullPolicy">org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy</attribute>
<attribute name="ClusterRouterFactory">org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory</attribute>
@@ -150,13 +149,6 @@
</para>
</section>
- <section id="conf.pullsize">
- <title>PullSize</title>
- <para>
- The maximum number of messages to pull from one node to another when the local node is starving. Defaults to 1.
- </para>
- </section>
-
<section id="conf.statssend">
<title>StatsSendPeriod</title>
<para>
Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2006-10-17 17:51:41 UTC (rev 1487)
@@ -75,7 +75,6 @@
<attribute name="GroupName">Queue</attribute>
<attribute name="StateTimeout">5000</attribute>
<attribute name="CastTimeout">5000</attribute>
- <attribute name="PullSize">1</attribute>
<attribute name="StatsSendPeriod">10000</attribute>
<attribute name="MessagePullPolicy">org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy</attribute>
<attribute name="ClusterRouterFactory">org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory</attribute>
@@ -140,7 +139,6 @@
<attribute name="GroupName">Topic</attribute>
<attribute name="StateTimeout">5000</attribute>
<attribute name="CastTimeout">5000</attribute>
- <attribute name="PullSize">1</attribute>
<attribute name="StatsSendPeriod">10000</attribute>
<attribute name="MessagePullPolicy">org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy</attribute>
<attribute name="ClusterRouterFactory">org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory</attribute>
Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-10-17 17:51:41 UTC (rev 1487)
@@ -83,12 +83,6 @@
<type>long</type>
</attribute>
- <attribute access="read-write" getMethod="getPullSize" setMethod="setPullSize">
- <description>The maximum number of message to pull in one go from a remote queue when the local queue consumers are starving</description>
- <name>PullSize</name>
- <type>int</type>
- </attribute>
-
<attribute access="read-write" getMethod="getStatsSendPeriod" setMethod="setStatsSendPeriod">
<description>The period in milliseconds between a post office casting it's statistics across the cluster</description>
<name>StatsSendPeriod</name>
Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -69,8 +69,6 @@
private String groupName;
- private int pullSize = 1;
-
private long statsSendPeriod = 1000;
private String clusterRouterFactory;
@@ -172,16 +170,6 @@
return groupName;
}
- public void setPullSize(int size)
- {
- this.pullSize = size;
- }
-
- public int getPullSize()
- {
- return pullSize;
- }
-
public void setStatsSendPeriod(long period)
{
this.statsSendPeriod = period;
@@ -256,7 +244,6 @@
syncChannelConfig, asyncChannelConfig,
stateTimeout, castTimeout,
pullPolicy, rf,
- pullSize,
statsSendPeriod);
postOffice.start();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -54,9 +54,9 @@
request = new BindRequest();
break;
}
- case PullMessagesResultRequest.TYPE:
+ case PullMessageResultRequest.TYPE:
{
- request = new PullMessagesResultRequest();
+ request = new PullMessageResultRequest();
break;
}
case MessageRequest.TYPE:
@@ -94,6 +94,11 @@
request = new UnbindRequest();
break;
}
+ case RollbackPullRequest.TYPE:
+ {
+ request = new RollbackPullRequest();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + type);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -84,6 +84,7 @@
//Used for failure testing
private boolean failBeforeCommit;
private boolean failAfterCommit;
+ private boolean failHandleResult;
private boolean trace = log.isTraceEnabled();
@@ -130,8 +131,6 @@
private ClusterRouterFactory routerFactory;
- private int pullSize;
-
private Map routerMap;
private StatsSender statsSender;
@@ -166,12 +165,11 @@
long stateTimeout, long castTimeout,
MessagePullPolicy redistributionPolicy,
ClusterRouterFactory rf,
- int pullSize,
long statsSendPeriod) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
- rf, pullSize, statsSendPeriod);
+ rf, statsSendPeriod);
this.syncChannelConfigE = syncChannelConfig;
this.asyncChannelConfigE = asyncChannelConfig;
@@ -193,12 +191,11 @@
long stateTimeout, long castTimeout,
MessagePullPolicy redistributionPolicy,
ClusterRouterFactory rf,
- int pullSize,
long statsSendPeriod) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
- rf, pullSize, statsSendPeriod);
+ rf, statsSendPeriod);
this.syncChannelConfigS = syncChannelConfig;
this.asyncChannelConfigS = asyncChannelConfig;
@@ -215,7 +212,6 @@
long stateTimeout, long castTimeout,
MessagePullPolicy redistributionPolicy,
ClusterRouterFactory rf,
- int pullSize,
long statsSendPeriod)
{
super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr, filterFactory,
@@ -233,8 +229,6 @@
this.routerFactory = rf;
- this.pullSize = pullSize;
-
routerMap = new HashMap();
statsSender = new StatsSender(this, statsSendPeriod);
@@ -795,6 +789,27 @@
if (trace) { log.trace(this.nodeId + " committed transaction " + id ); }
}
+ public void rollbackTransaction(TransactionId id) throws Throwable
+ {
+ if (trace) { log.trace(this.nodeId + " rolling back transaction " + id ); }
+
+ ClusterTransaction tx = null;
+
+ synchronized (holdingArea)
+ {
+ tx = (ClusterTransaction)holdingArea.remove(id);
+ }
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find transaction transaction id: " + id);
+ }
+
+ tx.rollback(this);
+
+ if (trace) { log.trace(this.nodeId + " committed transaction " + id ); }
+ }
+
/**
* Check for any transactions that need to be committed or rolled back
*/
@@ -997,43 +1012,61 @@
}
- public void handleMessagePullResult(int remoteNodeId, long holdingTxId, String queueName, List messages) throws Throwable
+ public void handleMessagePullResult(int remoteNodeId, long holdingTxId,
+ String queueName, org.jboss.messaging.core.Message message) throws Throwable
{
- if (trace) { log.trace(this.nodeId + " handling pull result " + messages + " for " + queueName); }
+ if (trace) { log.trace(this.nodeId + " handling pull result " + message + " for " + queueName); }
Binding binding = getBindingForQueueName(queueName);
- if (binding == null)
- {
- //This might happen if the queue is unbound
- return;
- }
-
- LocalClusteredQueue localQueue = (LocalClusteredQueue)binding.getQueue();
-
- RemoteQueueStub remoteQueue = localQueue.getPullQueue();
+ //The binding might be null if the queue was unbound
- if (remoteNodeId != remoteQueue.getNodeId())
- {
- //It might have changed since the request was sent
- Map bindings = (Map)nameMaps.get(new Integer(remoteNodeId));
+ boolean handled = false;
+
+ if (!failHandleResult && binding != null)
+ {
+ LocalClusteredQueue localQueue = (LocalClusteredQueue)binding.getQueue();
+
+ RemoteQueueStub remoteQueue = localQueue.getPullQueue();
- if (bindings != null)
+ if (remoteNodeId != remoteQueue.getNodeId())
{
- binding = (Binding)bindings.get(queueName);
+ //It might have changed since the request was sent
+ Map bindings = (Map)nameMaps.get(new Integer(remoteNodeId));
- if (binding != null)
- {
- remoteQueue = (RemoteQueueStub)binding.getQueue();
+ if (bindings != null)
+ {
+ binding = (Binding)bindings.get(queueName);
+
+ if (binding != null)
+ {
+ remoteQueue = (RemoteQueueStub)binding.getQueue();
+ }
}
}
+
+ if (remoteQueue != null)
+ {
+ localQueue.handlePullMessagesResult(remoteQueue, message, holdingTxId,
+ failBeforeCommit, failAfterCommit);
+
+ handled = true;
+ }
}
- if (remoteQueue != null)
+ if (!handled)
{
- localQueue.handlePullMessagesResult(remoteQueue, messages, holdingTxId,
- failBeforeCommit, failAfterCommit);
- }
+ //If we didn't handle it for what ever reason, then we might have to send a rollback
+ //message to the other node otherwise the transaction might end up in the holding
+ //area for ever
+ if (message.isReliable())
+ {
+ //Only reliable messages will be in holding area
+ this.asyncSendRequest(new RollbackPullRequest(this.nodeId, holdingTxId), remoteNodeId);
+
+ if (trace) { log.trace(this.nodeId + " send rollback pull request"); }
+ }
+ }
}
@@ -1045,10 +1078,11 @@
// Public ------------------------------------------------------------------------------------------
//Used for testing only
- public void setFail(boolean beforeCommit, boolean afterCommit)
+ public void setFail(boolean beforeCommit, boolean afterCommit, boolean handleResult)
{
this.failBeforeCommit = beforeCommit;
this.failAfterCommit = afterCommit;
+ this.failHandleResult = handleResult;
}
//Used for testing only
@@ -1137,7 +1171,6 @@
{
if (trace) { log.trace(this.nodeId + " loading bindings"); }
- // TODO I need to know whether this call times out - how do I know this??
boolean isState = syncChannel.getState(null, stateTimeout);
if (!isState)
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -29,6 +29,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.SimpleDelivery;
import org.jboss.messaging.core.local.PagingFilteredQueue;
@@ -178,12 +179,12 @@
acknowledgeInternal(d, null, false, false);
}
- public void handlePullMessagesResult(RemoteQueueStub remoteQueue, List messages,
+ public void handlePullMessagesResult(RemoteQueueStub remoteQueue, Message message,
long holdingTxId, boolean failBeforeCommit, boolean failAfterCommit) throws Exception
{
//This needs to be run on a different thread to the one used by JGroups to deliver the message
//to avoid deadlock
- Runnable runnable = new MessagePullResultRunnable(remoteQueue, messages, holdingTxId,
+ Runnable runnable = new MessagePullResultRunnable(remoteQueue, message, holdingTxId,
failBeforeCommit, failAfterCommit);
executor.execute(runnable);
@@ -280,12 +281,6 @@
}
}
- /*
- * Send a message pull request.
- *
- * TODO - do we really need this class?
- * Why can't we just execute on the same thread?
- */
private class SendPullRequestRunnable implements Runnable
{
private RemoteQueueStub theQueue;
@@ -325,18 +320,18 @@
* tx thus avoiding 2PC and maintaining reliability :)
* We do the following:
*
- * 1. Send a PullMessagesRequest to the remote node, on receipt it will create deliveries for message(s), and
+ * 1. Send a PullMessagesRequest to the remote node, on receipt it will create a delivery for the message, and
* possibly add a holding tx (if there are any persistent messages), the messages will then be returned in
* a PullMessagesResultRequest which is sent asynchronously from the remote node back to here to avoid
* distributed deadlock.
* 2. When the result is returned it hits this method.
- * 3. The retrieved messages are added to the local queue in the tx
- * 4. Deliveries corresponding to the messages retrieved are acknowledged LOCALLY for the remote queue.
+ * 3. The retrieved message is added to the local queue in the tx
+ * 4. Delivery corresponding to the message is acknowledged LOCALLY for the remote queue.
* 5. The local tx is committed.
* 6. Send "commit" message to remote node
- * 7. "Commit" message is received and deliveries in the holding transaction are acknowledged IN MEMORY only.
- * On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
- * depending on whether they exist in the database
+ * 7. "Commit" message is received and delivery in the holding transaction is acknowledged IN MEMORY only.
+ * On failure, commit or rollback will be called on the holding transaction causing the delivery to be acked or cancelled
+ * depending on whether it exists in the database
*
* Recovery is handled in the same way as CastMessagesCallback
*
@@ -367,82 +362,57 @@
{
try
{
- List dels = null;
+ Delivery del = null;
- //We only get the refs if receiversReady = false so as not to steal messages that
+ //We only get the delivery if receiversReady = false so as not to steal messages that
//might be consumed by local receivers
if (!receiversReady)
- {
- int count = 0;
-
+ {
MessageReference ref;
- dels = new ArrayList();
-
synchronized (refLock)
{
synchronized (deliveryLock)
{
- while (count < number && (ref = removeFirstInMemory()) != null)
+ ref = removeFirstInMemory();
+
+ if (ref != null)
{
- SimpleDelivery del = new SimpleDelivery(LocalClusteredQueue.this, ref);
+ del = new SimpleDelivery(LocalClusteredQueue.this, ref);
deliveries.add(del);
-
- dels.add(del);
-
- count++;
- }
+ }
}
}
}
- else
- {
- dels = Collections.EMPTY_LIST;
- }
- if (trace) { log.trace("PullMessagesRunnable got " + dels.size() + " deliveries"); }
+ if (trace) { log.trace("PullMessagesRunnable got " + del); }
- PullMessagesResultRequest response = new PullMessagesResultRequest(LocalClusteredQueue.this.nodeId, txId.getTxId(), name, dels.size());
-
- List reliableDels = null;
-
- if (!dels.isEmpty())
- {
- Iterator iter = dels.iterator();
-
- Delivery del = (Delivery)iter.next();
-
- if (del.getReference().isReliable())
+ if (del != null)
+ {
+ PullMessageResultRequest response =
+ new PullMessageResultRequest(LocalClusteredQueue.this.nodeId, txId.getTxId(),
+ name,
+ del.getReference().getMessage());
+
+ if (!del.getReference().isReliable())
{
- //Add it to internal list
- if (reliableDels == null)
- {
- reliableDels = new ArrayList();
- }
-
- reliableDels.add(del);
- }
- else
- {
//We can ack it now
del.acknowledge(null);
}
+ else
+ {
+ //Add this to the holding area
+ tx.setReliableDelivery(del);
+
+ office.holdTransaction(txId, tx);
+ }
+
+ //We send the messages asynchronously to avoid a deadlock situation which can occur
+ //if we were using MessageDispatcher to get the messages.
- response.addMessage(del.getReference().getMessage());
+ office.asyncSendRequest(response, returnNodeId);
}
-
- if (reliableDels != null)
- {
- //Add this to the holding area
- tx.setReliableDels(reliableDels);
- office.holdTransaction(txId, tx);
- }
-
- //We send the messages asynchronously to avoid a deadlock situation which can occur
- //if we were using MessageDispatcher to get the messages.
-
- office.asyncSendRequest(response, returnNodeId);
}
catch (Throwable e)
{
@@ -455,7 +425,7 @@
{
private RemoteQueueStub remoteQueue;
- private List messages;
+ private Message message;
private long holdingTxId;
@@ -464,12 +434,12 @@
private boolean failAfterCommit;
private MessagePullResultRunnable(RemoteQueueStub remoteQueue,
- List messages, long holdingTxId,
+ Message message, long holdingTxId,
boolean failBeforeCommit, boolean failAfterCommit)
{
this.remoteQueue = remoteQueue;
- this.messages = messages;
+ this.message = message;
this.holdingTxId = holdingTxId;
@@ -481,66 +451,57 @@
{
try
{
- // TODO we should optimise for the case when only one message is pulled which is basically all
- //we support now anyway
- //Also we should optimise for the case when only non persistent messages are pulled
- //in this case we don't need to create a tx.
+ Transaction tx = null;
- Transaction tx = tr.createTransaction();
+ boolean handleTransactionally = message.isReliable() && isRecoverable();
- Iterator iter = messages.iterator();
+ if (handleTransactionally)
+ {
+ tx = tr.createTransaction();
+
+ //It will already have been persisted on the other node
+ //so we need to set the persisted flag here
+ message.setPersisted(true);
+ }
- boolean containsReliable = false;
-
- while (iter.hasNext())
+ MessageReference ref = null;
+
+ try
{
- org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
+ ref = ms.reference(message);
- if (msg.isReliable())
+ //Should be executed synchronously since we already in the event queue
+ Delivery delRet = handleInternal(null, ref, tx, true, true);
+
+ if (delRet == null || !delRet.isSelectorAccepted())
{
- //It will already have been persisted on the other node
- //so we need to set the persisted flag here
- msg.setPersisted(true);
-
- containsReliable = true;
- }
-
- MessageReference ref = null;
-
- try
+ //This should never happen
+ throw new IllegalStateException("Queue did not accept reference!");
+ }
+ }
+ finally
+ {
+ if (ref != null)
{
- ref = ms.reference(msg);
-
- //Should be executed synchronously since we already in the event queue
- Delivery delRet = handleInternal(null, ref, tx, true, true);
-
- if (delRet == null || !delRet.isSelectorAccepted())
- {
- //This should never happen
- throw new IllegalStateException("Queue did not accept reference!");
- }
+ ref.releaseMemoryReference();
}
- finally
- {
- if (ref != null)
- {
- ref.releaseMemoryReference();
- }
- }
-
- //Acknowledge on the remote queue stub
- Delivery del = new SimpleDelivery(remoteQueue, ref);
-
- del.acknowledge(tx);
}
+
+ //Acknowledge on the remote queue stub
+ Delivery del = new SimpleDelivery(remoteQueue, ref);
+ del.acknowledge(tx);
+
//For testing to simulate failures
if (failBeforeCommit)
{
throw new Exception("Test failure before commit");
}
- tx.commit();
+ if (handleTransactionally)
+ {
+ tx.commit();
+ }
//For testing to simulate failures
if (failAfterCommit)
@@ -555,9 +516,7 @@
//and send a checkrequest
//This applies to a normal message and messages requests too
- //We only need to send a commit message if there were reliable messages since otherwise
- //the transaction wouldn't have been added in the holding area
- if (containsReliable && isRecoverable())
+ if (handleTransactionally)
{
ClusterRequest req = new PullMessagesRequest(nodeId, holdingTxId);
@@ -568,9 +527,6 @@
{
log.error("Failed to handle pulled message", e);
}
- }
-
- }
-
-
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -62,11 +62,13 @@
void commitTransaction(TransactionId id) throws Throwable;
+ void rollbackTransaction(TransactionId id) throws Throwable;
+
void updateQueueStats(int nodeId, List stats) throws Exception;
void sendQueueStats() throws Exception;
boolean referenceExistsInStorage(long channelID, long messageID) throws Exception;
- void handleMessagePullResult(int remoteNodeId, long holdingTxId, String queueName, List messages) throws Throwable;
+ void handleMessagePullResult(int remoteNodeId, long holdingTxId, String queueName, Message message) throws Throwable;
}
Copied: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessageResultRequest.java (from rev 1485, trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java 2006-10-17 14:49:19 UTC (rev 1485)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessageResultRequest.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.message.MessageFactory;
+
+/**
+ *
+ * A PullMessageResultRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class PullMessageResultRequest extends ClusterRequest
+{
+ private static final Logger log = Logger.getLogger(PullMessageResultRequest.class);
+
+ public static final int TYPE = 2;
+
+ private long holdingTxId;
+
+ private String queueName;
+
+ private Message message;
+
+ private int remoteNodeId;
+
+ PullMessageResultRequest()
+ {
+ }
+
+ PullMessageResultRequest(int remoteNodeId, long holdingTxId, String queueName, Message message)
+ {
+ this.remoteNodeId = remoteNodeId;
+
+ this.holdingTxId = holdingTxId;
+
+ this.queueName = queueName;
+
+ this.message = message;
+ }
+
+ Message getMessage()
+ {
+ return message;
+ }
+
+ public void read(DataInputStream in) throws Exception
+ {
+ remoteNodeId = in.readInt();
+
+ holdingTxId = in.readLong();
+
+ queueName = in.readUTF();
+
+ byte type = in.readByte();
+
+ message = MessageFactory.createMessage(type);
+
+ message.read(in);
+ }
+
+ public void write(DataOutputStream out) throws Exception
+ {
+ out.writeInt(remoteNodeId);
+
+ out.writeLong(holdingTxId);
+
+ out.writeUTF(queueName);
+
+ out.writeByte(message.getType());
+
+ message.write(out);
+ }
+
+ Object execute(PostOfficeInternal office) throws Throwable
+ {
+ office.handleMessagePullResult(remoteNodeId, holdingTxId, queueName, message);
+
+ return null;
+ }
+
+ byte getType()
+ {
+ return TYPE;
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -49,7 +49,7 @@
private int numMessages;
- private List reliableDels;
+ private Delivery reliableDelivery;
static final int TYPE = 5;
@@ -101,9 +101,9 @@
}
//TODO this is a bit messsy - must be a nicer way of setting this
- void setReliableDels(List reliableDels)
+ void setReliableDelivery(Delivery del)
{
- this.reliableDels = reliableDels;
+ this.reliableDelivery = del;
}
byte getType()
@@ -113,21 +113,15 @@
public boolean check(PostOfficeInternal office) throws Exception
{
- // If the messages DON'T exist in the database then we should commit the transaction
- // Since the acks have already been processed persistently
+ // If the message doesn't exist in the database then we should commit the transaction
+ // Since the ack has already been processed persistently
// otherwise we should roll it back
-
- Iterator iter = reliableDels.iterator();
-
- //We only need to check one of them since they would all have been acked in a tx
-
- Delivery del = (Delivery)iter.next();
-
+
//We store the channelID of one of the channels that the message was persisted in
//it doesn't matter which one since they were all inserted in the same tx
- if (office.referenceExistsInStorage(checkChannelID, del.getReference().getMessageID()))
+ if (office.referenceExistsInStorage(checkChannelID, reliableDelivery.getReference().getMessageID()))
{
//We should rollback
return false;
@@ -141,34 +135,20 @@
public void commit(PostOfficeInternal office) throws Throwable
{
- //We need to ack the deliveries
+ //We need to ack the delivery
- Iterator iter = reliableDels.iterator();
-
- while (iter.hasNext())
- {
- Delivery del = (Delivery)iter.next();
-
- //We need to ack them in memory only
- //since they would have been acked on the pulling node
- LocalClusteredQueue queue = (LocalClusteredQueue)del.getObserver();
-
- queue.acknowledgeFromCluster(del);
- }
+ //We need to ack it in memory only
+ //since it would have been acked on the pulling node
+ LocalClusteredQueue queue = (LocalClusteredQueue)reliableDelivery.getObserver();
+
+ queue.acknowledgeFromCluster(reliableDelivery);
}
public void rollback(PostOfficeInternal office) throws Throwable
{
- //We need to cancel the deliveries
+ //We need to cancel the delivery
- Iterator iter = reliableDels.iterator();
-
- while (iter.hasNext())
- {
- Delivery del = (Delivery)iter.next();
-
- del.cancel();
- }
+ reliableDelivery.cancel();
}
public void read(DataInputStream in) throws Exception
Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -1,140 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.message.MessageFactory;
-
-/**
- *
- * A PullMessagesResultRequest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class PullMessagesResultRequest extends ClusterRequest
-{
- private static final Logger log = Logger.getLogger(PullMessagesResultRequest.class);
-
- public static final int TYPE = 2;
-
- private long holdingTxId;
-
- private String queueName;
-
- private List messages;
-
- private int remoteNodeId;
-
- PullMessagesResultRequest()
- {
- }
-
- PullMessagesResultRequest(int remoteNodeId, long holdingTxId, String queueName, int size)
- {
- this.remoteNodeId = remoteNodeId;
-
- this.holdingTxId = holdingTxId;
-
- this.queueName = queueName;
-
- messages = new ArrayList(size);
- }
-
- void addMessage(Message msg)
- {
- messages.add(msg);
- }
-
- List getMessages()
- {
- return messages;
- }
-
- public void read(DataInputStream in) throws Exception
- {
- remoteNodeId = in.readInt();
-
- holdingTxId = in.readLong();
-
- queueName = in.readUTF();
-
- int num = in.readInt();
-
- messages = new ArrayList(num);
-
- for (int i = 0; i < num; i++)
- {
- byte type = in.readByte();
-
- Message msg = MessageFactory.createMessage(type);
-
- msg.read(in);
-
- messages.add(msg);
- }
- }
-
- public void write(DataOutputStream out) throws Exception
- {
- out.writeInt(remoteNodeId);
-
- out.writeLong(holdingTxId);
-
- out.writeUTF(queueName);
-
- out.writeInt(messages.size());
-
- Iterator iter = messages.iterator();
-
- while (iter.hasNext())
- {
- Message msg = (Message)iter.next();
-
- out.writeByte(msg.getType());
-
- msg.write(out);
- }
- }
-
- Object execute(PostOfficeInternal office) throws Throwable
- {
- office.handleMessagePullResult(remoteNodeId, holdingTxId, queueName, messages);
-
- return null;
- }
-
- byte getType()
- {
- return TYPE;
- }
-}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RollbackPullRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RollbackPullRequest.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RollbackPullRequest.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -0,0 +1,81 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * A RollbackPullRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class RollbackPullRequest extends ClusterRequest
+{
+ static final int TYPE = 10;
+
+ private int nodeId;
+
+ private long txId;
+
+ public RollbackPullRequest()
+ {
+ }
+
+ RollbackPullRequest(int nodeId, long txId)
+ {
+ this.nodeId = nodeId;
+
+ this.txId = txId;
+ }
+
+ Object execute(PostOfficeInternal office) throws Throwable
+ {
+ office.rollbackTransaction(new TransactionId(nodeId, txId));
+
+ return null;
+ }
+
+ byte getType()
+ {
+ return TYPE;
+ }
+
+ public void read(DataInputStream in) throws Exception
+ {
+ nodeId = in.readInt();
+
+ txId = in.readLong();
+ }
+
+ public void write(DataOutputStream out) throws Exception
+ {
+ out.writeInt(nodeId);
+
+ out.writeLong(txId);
+ }
+
+}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -2227,7 +2227,7 @@
groupName,
JGroupsUtil.getControlStackProperties(),
JGroupsUtil.getDataStackProperties(),
- 5000, 5000, pullPolicy, rf, 1, 1000);
+ 5000, 5000, pullPolicy, rf, 1000);
postOffice.start();
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -384,7 +384,7 @@
groupName,
JGroupsUtil.getControlStackProperties(),
JGroupsUtil.getDataStackProperties(),
- 5000, 5000, redistPolicy, rf, 1, 1000);
+ 5000, 5000, redistPolicy, rf, 1000);
postOffice.start();
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -359,7 +359,7 @@
groupName,
JGroupsUtil.getControlStackProperties(),
JGroupsUtil.getDataStackProperties(),
- 5000, 5000, redistPolicy, rf, 1, 1000);
+ 5000, 5000, redistPolicy, rf, 1000);
postOffice.start();
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -116,7 +116,7 @@
queue3.add(receiver3);
//This will make it fail after casting but before persisting the message in the db
- office1.setFail(true, false);
+ office1.setFail(true, false, false);
Transaction tx = tr.createTransaction();
@@ -251,7 +251,7 @@
queue3.add(receiver3);
//This will make it fail after casting and persisting the message in the db
- office1.setFail(false, true);
+ office1.setFail(false, true, false);
Transaction tx = tr.createTransaction();
@@ -361,7 +361,7 @@
groupName,
JGroupsUtil.getControlStackProperties(),
JGroupsUtil.getDataStackProperties(),
- 5000, 5000, redistPolicy, rf, 1, 1000);
+ 5000, 5000, redistPolicy, rf, 1000);
postOffice.start();
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java 2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java 2006-10-17 17:51:41 UTC (rev 1487)
@@ -87,7 +87,6 @@
super.tearDown();
}
-
public void testConsumeAllNonPersistentNonRecoverable() throws Throwable
{
consumeAll(false, false);
@@ -107,9 +106,7 @@
{
consumeAll(true, true);
}
-
-
-
+
public void testConsumeBitByBitNonPersistentNonRecoverable() throws Throwable
{
consumeBitByBit(false, false);
@@ -130,30 +127,6 @@
consumeBitByBit(true, true);
}
-
-
-
-//
-// public void testConsumeConcurrentlyNonPersistentNonRecoverable() throws Throwable
-// {
-// consumeConcurrently(false, false);
-// }
-//
-// public void testConsumeConsumeConcurrentlyPersistentNonRecoverable() throws Throwable
-// {
-// consumeConcurrently(true, false);
-// }
-//
-// public void testConsumeConsumeConcurrentlyNonPersistentRecoverable() throws Throwable
-// {
-// consumeConcurrently(false, true);
-// }
-//
-// public void testConsumeConsumeConcurrentlyPersistentRecoverable() throws Throwable
-// {
-// consumeConcurrently(true, true);
-// }
-
public void testSimpleMessagePull() throws Throwable
{
DefaultClusteredPostOffice office1 = null;
@@ -301,7 +274,7 @@
receiver2.setMaxRefs(1);
//Force a failure before commit
- office2.setFail(true, false);
+ office2.setFail(true, false, false);
log.info("delivering");
queue2.deliver(false);
@@ -401,7 +374,7 @@
receiver2.setMaxRefs(1);
//Force a failure after commit the ack to storage
- office2.setFail(false, true);
+ office2.setFail(false, true, false);
log.info("delivering");
queue2.deliver(false);
@@ -444,6 +417,93 @@
}
}
+ public void testFailHandleMessagePullResult() throws Throwable
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 =
+ office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 =
+ office2.bindClusteredQueue("queue1", queue2);
+
+ Message msg = CoreMessageFactory.createCoreMessage(1);
+ msg.setReliable(true);
+
+ MessageReference ref = ms.reference(msg);
+
+ office1.route(ref, "queue1", null);
+
+ Thread.sleep(2000);
+
+ //Messages should all be in queue1
+
+ List msgs = queue1.browse();
+ assertEquals(1, msgs.size());
+
+ msgs = queue2.browse();
+ assertTrue(msgs.isEmpty());
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ receiver1.setMaxRefs(0);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ receiver2.setMaxRefs(0);
+ queue2.add(receiver2);
+
+ //Prompt delivery so the channels know if the receivers are ready
+ queue1.deliver(false);
+ Thread.sleep(2000);
+
+ //Pull from 1 to 2
+
+ receiver2.setMaxRefs(1);
+
+ office2.setFail(false, false, true);
+
+ log.info("delivering");
+ queue2.deliver(false);
+
+ Thread.sleep(3000);
+
+ //The delivery should be rolled back
+
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+
+ log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+
+ assertEquals(1, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(0, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+ }
+ }
+
protected void consumeAll(boolean persistent, boolean recoverable) throws Throwable
{
DefaultClusteredPostOffice office1 = null;
@@ -946,134 +1006,8 @@
office5.stop();
}
}
- }
+ }
- protected void consumeConcurrently(boolean persistent, boolean recoverable) throws Throwable
- {
- DefaultClusteredPostOffice office1 = null;
-
- DefaultClusteredPostOffice office2 = null;
-
- DefaultClusteredPostOffice office3 = null;
-
- DefaultClusteredPostOffice office4 = null;
-
- DefaultClusteredPostOffice office5 = null;
-
- try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-
- office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
-
- office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
-
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
-
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
-
- //Test with no consumers on queue1
-
- //Two equal consumers on queue2 and queue3
-
- //Add messages at queue 1
-
- final int NUM_MESSAGES = 10000;
-
- this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
-
- log.info("sent messages");
-
- Thread.sleep(4000);
-
- ThrottleReceiver receiver1 = new ThrottleReceiver(queue1, 0, 50);
- queue1.add(receiver1);
- queue1.deliver(false);
-
- ThrottleReceiver receiver2 = new ThrottleReceiver(queue2, 0, 50);
- queue2.add(receiver2);
- queue2.deliver(false);
-
- Thread.sleep(45000);
-
- log.info("receiver1: " + receiver1.getTotalCount());
-
- log.info("receiver2: " + receiver2.getTotalCount());
-
-
- //test1
-
-
- //No consumer on node 1
- //Very slow consumer on node 2
- //
-
- /*
- * Test with very fast, infinitely big consumer (i.e. is always ready) on node 1
- * Fast consumer on node2
- * Send messages on node 1
- * Verify all go to node1 consumer
- *
- * Test with very fast, not infinitely big consumer (i.e. is not always ready) on node 1
- * Fast consumer on node2
- * Send messages on node 1
- * Verify most go to node1 consumer, some go to node 2
- *
- * Test with slow consumer on node 1, Fast consumer on node 2
- *
- * Test with no consumer on node 1, consumers on other nodes
- *
- * Things up all the other permutations, then take a guess with error margin of
- * how many messages should be on each node.
- */
-
-
- checkNoMessageData();
- }
- finally
- {
- if (office1 != null)
- {
- office1.stop();
- }
-
- if (office2 != null)
- {
- office2.stop();
- }
-
- if (office3 != null)
- {
- office3.stop();
- }
-
- if (office4 != null)
- {
- office4.stop();
- }
-
- if (office5 != null)
- {
- office5.stop();
- }
- }
- }
-
class ThrottleReceiver implements Receiver, Runnable
{
long pause;
@@ -1218,7 +1152,7 @@
groupName,
JGroupsUtil.getControlStackProperties(),
JGroupsUtil.getDataStackProperties(),
- 10000, 10000, pullPolicy, rf, 1, 1000);
+ 10000, 10000, pullPolicy, rf, 1000);
postOffice.start();
More information about the jboss-cvs-commits
mailing list