[jboss-cvs] JBoss Messaging SVN: r1473 - in trunk: src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/local src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 13 08:09:50 EDT 2006
Author: timfox
Date: 2006-10-13 08:09:35 -0400 (Fri, 13 Oct 2006)
New Revision: 1473
Removed:
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Modified:
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-606 http://jira.jboss.com/jira/browse/JBMESSAGING-575 http://jira.jboss.com/jira/browse/JBMESSAGING-596
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -821,7 +821,7 @@
// <path refid="jboss.jmx.classpath"/> from jms/build.xml dependentmodule.classpath
//
- //FIXME - Yes this is super-ugly - there must be an easier way of doing it
+ //TODO - Yes this is super-ugly - there must be an easier way of doing it
//also in LocalTestServer is doing the same thing in a slightly different way
//this should be combined
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -176,7 +176,7 @@
if (trace) log.trace("created selector");
}
- //FIXME -
+ //TODO -
//We really need to get rid of this delivery list - it's only purpose in life is to solve
//the race condition where acks or cancels can come in before handle has returned - and
//that can be solved in a simpler way anyway.
@@ -350,7 +350,7 @@
// it. This is because it may still contain deliveries that may well be acknowledged
// after the consumer has closed. This is perfectly valid.
- // FIXME - The deliveries should really be stored in the session endpoint, not here
+ // TODO - The deliveries should really be stored in the session endpoint, not here
// that is their natural place, that would mean we wouldn't have to mess around with
// keeping deliveries after this is closed.
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -51,10 +51,6 @@
* and deliver to receivers are not executed concurrently but placed on an event
* queue and executed serially by a single thread.
*
- * This prevents lock contention since requests are
- * executed serially, resulting in better scalability and higher throughput at the expense of some
- * latency.
- *
* Currently remoting does not support a non blocking API so a full SEDA approach is not possible at this stage.
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -99,7 +95,7 @@
protected Object deliveryLock;
protected boolean active = true;
-
+
// Constructors --------------------------------------------------
protected ChannelSupport(long channelID, MessageStore ms,
@@ -172,7 +168,7 @@
}
else
{
- return handleInternal(sender, ref, tx, true, false);
+ return handleInternal(sender, ref, tx, true, false, true);
}
}
@@ -189,7 +185,13 @@
public void cancel(Delivery d) throws Throwable
{
// TODO We should also consider executing cancels on the event queue
- cancelInternal(d);
+ synchronized (deliveryLock)
+ {
+ synchronized (refLock)
+ {
+ cancelInternal(d);
+ }
+ }
}
// Distributor implementation ------------------------------------
@@ -382,6 +384,7 @@
del.acknowledge(null);
}
+
}
}
}
@@ -491,6 +494,7 @@
// Public --------------------------------------------------------
+ //Only used for testing
public int memoryRefCount()
{
synchronized (refLock)
@@ -499,6 +503,7 @@
}
}
+ //Only used for testing
public int memoryDeliveryCount()
{
synchronized (deliveryLock)
@@ -515,7 +520,35 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
+ protected MessageReference nextReference(ListIterator iter, boolean handle) throws Throwable
+ {
+ MessageReference ref;
+
+ if (iter == null)
+ {
+ //We just get the next ref from the head of the queue
+ ref = (MessageReference) messageRefs.peekFirst();
+ }
+ else
+ {
+ // TODO This will not work with paged refs - see http://jira.jboss.com/jira/browse/JBMESSAGING-275
+ // We need to extend it to work with refs from the db
+
+ //We have an iterator - this means we are iterating through the queue to find a ref that matches
+ if (iter.hasNext())
+ {
+ ref = (MessageReference)iter.next();
+ }
+ else
+ {
+ ref = null;
+ }
+ }
+
+ return ref;
+ }
+
/*
* This methods delivers as many messages as possible to the router until no
* more deliveries are returned. This method should never be called at the
@@ -535,47 +568,16 @@
while (true)
{
- synchronized (refLock)
- {
- if (iter == null)
- {
- ref = (MessageReference) messageRefs.peekFirst();
- }
- else
- {
- if (iter.hasNext())
- {
- ref = (MessageReference)iter.next();
- }
- else
- {
- ref = null;
- }
- }
+ synchronized (deliveryLock)
+ {
+ ref = nextReference(iter, handle);
}
-
if (ref != null)
{
- // Check if message is expired (we also do this on the client
- // side)
- // If so ack it from the channel
+ // Check if message is expired (we also do this on the clientside) If so ack it from the channel
if (ref.isExpired())
{
- if (trace) { log.trace("Message reference: " + ref + " has expired"); }
-
- // remove and acknowledge it
- if (iter == null)
- {
- removeFirstInMemory();
- }
- else
- {
- iter.remove();
- }
-
- Delivery delivery = new SimpleDelivery(this, ref, true);
-
- acknowledgeInternal(delivery, null, true, false);
+ expireRef(ref, iter);
}
else
{
@@ -589,33 +591,24 @@
if (del == null)
{
// No receiver, broken receiver or full receiver so we stop delivering; also
- // we need to decrement the delivery count, as no real delivery has been
- // actually performed
+ // we need to decrement the delivery count, as no real delivery has been actually performed
if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
ref.decrementDeliveryCount();
+
receiversReady = false;
- return;
+
+ break;
}
else if (!del.isSelectorAccepted())
{
// No receiver accepted the message because no selectors matched, so we create
// an iterator (if we haven't already created it) to iterate through the refs
- // in the channel. No delivery was really performed, so we decrement the
- // delivery count
+ // in the channel. No delivery was really performed, so we decrement the delivery count
ref.decrementDeliveryCount();
- // TODO Note that this is only a partial solution since if there are messages
- // paged to storage it won't try those - i.e. it will only iterate through
- // those refs in memory. Dealing with refs in storage is somewhat tricky since
- // we can't just load them and iterate through them since we might run out of
- // memory, so we will need to load individual refs from storage given the
- // selector expressions. Secondly we should also introduce some in memory
- // indexes here to prevent having to iterate through all the refs every time.
- // Having said all that, having consumers on a queue that don't match many
- // messages is an antipattern and should be avoided by the user.
if (iter == null)
{
iter = messageRefs.iterator();
@@ -627,7 +620,7 @@
// Receiver accepted the reference
- // We must synchronize here to cope with another race condition where message
+ // We must synchronize here to cope with a race condition where message
// is cancelled/acked in flight while the following few actions are being
// performed. e.g. delivery could be cancelled acked after being removed from
// state but before delivery being added (observed).
@@ -635,29 +628,13 @@
{
if (trace) { log.trace(this + " incrementing delivery count for " + del); }
- // FIXME - It's actually possible the delivery could be
- // cancelled before it reaches
- // here, in which case we wouldn't get a delivery but we
- // still need to increment the
- // delivery count
- // All the problems related to these race conditions and
- // fiddly edge cases will disappear
- // once we do
- // http://jira.jboss.com/jira/browse/JBMESSAGING-355
- // This will make life a lot easier
+ // FIXME - It's actually possible the delivery could be cancelled before it reaches
+ // here, in which case we wouldn't get a delivery but we still need to increment the
+ // delivery count. TODO http://jira.jboss.com/jira/browse/JBMESSAGING-355
- del.getReference().incrementDeliveryCount();
-
if (!del.isCancelled())
{
- if (iter == null)
- {
- removeFirstInMemory();
- }
- else
- {
- iter.remove();
- }
+ removeReference(iter);
// delivered
if (!del.isDone())
@@ -677,6 +654,7 @@
{
// No more refs in channel or only ones that don't match any selectors
if (trace) { log.trace(this + " no more refs to deliver "); }
+
break;
}
}
@@ -688,7 +666,8 @@
}
protected Delivery handleInternal(DeliveryObserver sender, MessageReference ref,
- Transaction tx, boolean persist, boolean synchronous)
+ Transaction tx, boolean persist, boolean synchronous,
+ boolean deliver)
{
if (ref == null)
{
@@ -716,12 +695,9 @@
if (tx == null)
{
// Don't even attempt synchronous delivery for a reliable message
- // when we have an
- // non-recoverable state that doesn't accept reliable messages. If
- // we do, we may get
- // into the situation where we need to reliably store an active
- // delivery of a reliable
- // message, which in these conditions cannot be done.
+ // when we have an non-recoverable state that doesn't accept reliable messages. If
+ // we do, we may get into the situation where we need to reliably store an active
+ // delivery of a reliable message, which in these conditions cannot be done.
if (ref.isReliable() && !acceptReliableMessages)
{
@@ -738,11 +714,14 @@
pm.addReference(channelID, ref, null);
}
- addReferenceInMemory(ref);
+ synchronized (refLock)
+ {
+ addReferenceInMemory(ref);
+ }
// We only do delivery if there are receivers that haven't said they don't want
// any more references.
- if (receiversReady)
+ if (receiversReady && deliver)
{
// Prompt delivery
deliverInternal(true);
@@ -765,7 +744,7 @@
else
{
// add to post commit callback
- getCallback(tx, synchronous).addRef(ref);
+ getCallback(tx, synchronous, deliver).addRef(ref);
if (trace) { log.trace(this + " added transactionally " + ref + " in memory"); }
}
@@ -793,7 +772,8 @@
return new SimpleDelivery(this, ref, true);
}
- protected void acknowledgeInternal(Delivery d, Transaction tx, boolean persist, boolean synchronous) throws Exception
+ protected void acknowledgeInternal(Delivery d, Transaction tx, boolean persist,
+ boolean synchronous) throws Exception
{
if (tx == null)
{
@@ -811,7 +791,7 @@
}
else
{
- this.getCallback(tx, synchronous).addDelivery(d);
+ this.getCallback(tx, synchronous, false).addDelivery(d);
if (trace) { log.trace(this + " added " + d + " to memory on transaction " + tx); }
@@ -839,13 +819,13 @@
return removed;
}
- protected InMemoryCallback getCallback(Transaction tx, boolean synchronous)
+ protected InMemoryCallback getCallback(Transaction tx, boolean synchronous, boolean deliver)
{
InMemoryCallback callback = (InMemoryCallback) tx.getCallback(this);
if (callback == null)
{
- callback = new InMemoryCallback(synchronous);
+ callback = new InMemoryCallback(synchronous, deliver);
tx.addCallback(callback, this);
}
@@ -856,19 +836,99 @@
{
throw new IllegalStateException("Callback synchronousness status doesn't match");
}
+ if (callback.isDeliver() != deliver)
+ {
+ throw new IllegalStateException("Callback deliver status doesn't match");
+ }
}
return callback;
}
- protected abstract boolean cancelInternal(Delivery del) throws Exception;
+ protected boolean cancelInternal(Delivery del) throws Exception
+ {
+ if (trace) { log.trace(this + " cancelling " + del + " in memory"); }
+
+ boolean removed = deliveries.remove(del);
+
+ if (!removed)
+ {
+ // This can happen if the message is cancelled before the result of
+ // ServerConsumerDelegate.handle has returned, in which case we won't have a record of the delivery
+ // In this case we don't want to add the message reference back into
+ // the state since it was never removed in the first place
+
+ if (trace) { log.trace(this + " can't find delivery " + del + " in state so not replacing messsage ref"); }
+ }
+ else
+ {
+ messageRefs.addFirst(del.getReference(), del.getReference().getPriority());
+
+ if (trace) { log.trace(this + " added " + del.getReference() + " back into state"); }
+ }
+
+ return removed;
+ }
- protected abstract MessageReference removeFirstInMemory() throws Exception;
+ protected MessageReference removeFirstInMemory() throws Exception
+ {
+ MessageReference result = (MessageReference) messageRefs.removeFirst();
+
+ return (MessageReference) result;
+ }
- protected abstract void addReferenceInMemory(MessageReference ref) throws Exception;
+ protected void addReferenceInMemory(MessageReference ref) throws Exception
+ {
+ if (ref.isReliable() && !acceptReliableMessages)
+ {
+ throw new IllegalStateException("Reliable reference " + ref +
+ " cannot be added to non-recoverable state");
+ }
+
+ messageRefs.addLast(ref, ref.getPriority());
+
+ if (trace){ log.trace(this + " added " + ref + " non-transactionally in memory"); }
+ }
// Private -------------------------------------------------------
+
+ private void expireRef(MessageReference ref, ListIterator iter) throws Exception
+ {
+ if (trace) { log.trace("Message reference: " + ref + " has expired"); }
+ // remove and acknowledge it
+ synchronized (refLock)
+ {
+ if (iter == null)
+ {
+ removeFirstInMemory();
+ }
+ else
+ {
+ iter.remove();
+ }
+ }
+
+ Delivery delivery = new SimpleDelivery(this, ref, true);
+
+ acknowledgeInternal(delivery, null, true, false);
+ }
+
+ private void removeReference(ListIterator iter) throws Exception
+ {
+ synchronized (refLock)
+ {
+ if (iter == null)
+ {
+ removeFirstInMemory();
+ }
+ else
+ {
+ iter.remove();
+ }
+ }
+ }
+
// Inner classes -------------------------------------------------
private class InMemoryCallback implements TxCallback, Runnable
@@ -879,23 +939,32 @@
private boolean synchronous;
+ private boolean deliver;
+
private boolean committing;
private Future result;
- private InMemoryCallback(boolean synchronous)
+ private InMemoryCallback(boolean synchronous, boolean deliver)
{
refsToAdd = new ArrayList();
deliveriesToRemove = new ArrayList();
this.synchronous = synchronous;
+
+ this.deliver = deliver;
}
private boolean isSynchronous()
{
return synchronous;
}
+
+ private boolean isDeliver()
+ {
+ return deliver;
+ }
private void addRef(MessageReference ref)
{
@@ -1045,7 +1114,10 @@
try
{
- addReferenceInMemory(ref);
+ synchronized (refLock)
+ {
+ addReferenceInMemory(ref);
+ }
}
catch (Throwable t)
{
@@ -1079,7 +1151,7 @@
}
//prompt delivery
- if (receiversReady)
+ if (deliver && receiversReady)
{
deliverInternal(true);
}
@@ -1178,7 +1250,7 @@
public void run()
{
- Delivery d = handleInternal(sender, ref, null, persist, false);
+ Delivery d = handleInternal(sender, ref, null, persist, false, true);
result.setResult(d);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -181,6 +181,7 @@
// Public --------------------------------------------------------
+ //Only used in testing
public int downCacheCount()
{
synchronized (refLock)
@@ -189,6 +190,7 @@
}
}
+ //Only used in testing
public boolean isPaging()
{
synchronized (refLock)
@@ -364,60 +366,37 @@
}
}
}
-
+
protected boolean cancelInternal(Delivery del) throws Exception
{
if (trace) { log.trace(this + " cancelling " + del + " in memory"); }
- boolean removed;
-
- synchronized (deliveryLock)
+ boolean removed = super.cancelInternal(del);
+
+ if (removed && paging)
{
- removed = deliveries.remove(del);
- }
-
- if (!removed)
- {
- // This can happen if the message is cancelled before the result of
- // ServerConsumerDelegate.handle has returned, in which case we won't have a record of the delivery
- // In this case we don't want to add the message reference back into
- // the state since it was never removed in the first place
-
- if (trace) { log.trace(this + " can't find delivery " + del + " in state so not replacing messsage ref"); }
- }
- else
- {
- synchronized (refLock)
+ // if paging and the in memory queue is exactly full we need to evict the end reference to storage to
+ // preserve the number of refs in the queue
+ if (messageRefs.size() == fullSize + 1)
{
- messageRefs.addFirst(del.getReference(), del.getReference().getPriority());
-
- if (paging)
- {
- // if paging we need to evict the end reference to storage to
- // preserve the number of refs in the queue
-
- MessageReference ref = (MessageReference)messageRefs.removeLast();
+ MessageReference ref = (MessageReference)messageRefs.removeLast();
- addToDownCache(ref, true);
- }
+ addToDownCache(ref, true);
}
-
- if (trace) { log.trace(this + " added " + del.getReference() + " back into state"); }
}
+
+ if (trace) { log.trace(this + " added " + del.getReference() + " back into state"); }
return removed;
}
-
+
protected MessageReference removeFirstInMemory() throws Exception
{
- synchronized (refLock)
- {
- MessageReference result = (MessageReference) messageRefs.removeFirst();
+ MessageReference result = super.removeFirstInMemory();
- checkLoad();
+ checkLoad();
- return (MessageReference) result;
- }
+ return result;
}
private boolean checkLoad() throws Exception
@@ -447,36 +426,30 @@
return false;
}
}
-
+
protected void addReferenceInMemory(MessageReference ref) throws Exception
- {
- if (ref.isReliable() && !acceptReliableMessages)
+ {
+ if (paging)
{
- throw new IllegalStateException("Reliable reference " + ref +
- " cannot be added to non-recoverable state");
+ if (ref.isReliable() && !acceptReliableMessages)
+ {
+ throw new IllegalStateException("Reliable reference " + ref +
+ " cannot be added to non-recoverable state");
+ }
+ addToDownCache(ref, false);
}
-
- synchronized (refLock)
+ else
{
- if (paging)
+ super.addReferenceInMemory(ref);
+
+ if (messageRefs.size() == fullSize)
{
- addToDownCache(ref, false);
- }
- else
- {
- messageRefs.addLast(ref, ref.getPriority());
+ // We are full in memory - go into paging mode
+ if (trace) { log.trace(this + " going into paging mode"); }
- if (trace){ log.trace(this + " added " + ref + " non-transactionally in memory"); }
-
- if (messageRefs.size() == fullSize)
- {
- // We are full in memory - go into paging mode
- if (trace) { log.trace(this + " going into paging mode"); }
-
- paging = true;
- }
+ paging = true;
}
- }
+ }
}
protected void addToDownCache(MessageReference ref, boolean cancelling) throws Exception
@@ -525,8 +498,7 @@
if (trace) { log.trace(this + " flushing " + downCache.size() + " refs from downcache"); }
// Non persistent refs won't already be in the db so they need to be inserted
- // Persistent refs in a recoverable state will already be there so need to
- // be updated
+ // Persistent refs in a recoverable state will already be there so need to be updated
List toUpdate = new ArrayList();
@@ -573,9 +545,7 @@
if (trace) { log.trace(this + " cleared downcache"); }
}
-
-
-
+
// Private ------------------------------------------------------------------------------
private MessageReference addFromRefInfo(ReferenceInfo info, Map refMap)
Modified: trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -108,7 +108,7 @@
return cancelled;
}
- public synchronized boolean isSelectorAccepted()
+ public boolean isSelectorAccepted()
{
return selectorAccepted;
}
Modified: trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -87,7 +87,7 @@
// try to release the lock as quickly as possible and make a copy of the receivers array
// to avoid deadlock (http://jira.jboss.org/jira/browse/JBMESSAGING-491)
- //FIXME - we shouldn't be cloning an ArrayList for the delivery of each message
+ //TODO - we shouldn't be cloning an ArrayList for the delivery of each message
//on the primary execution path!
receiversCopy = new ArrayList(receivers.size());
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -2970,7 +2970,7 @@
//Now set the fields from org.joss.jms.message.JBossMessage if appropriate
- //FIXME - We are mixing concerns here
+ //TODO - We are mixing concerns here
//The basic JDBCPersistencManager should *only* know about core messages - not
//JBossMessages - we should subclass JBDCPersistenceManager and the JBossMessage
//specific code in a subclass
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -191,7 +191,6 @@
//Only used in testing
if (failAfterCommit)
{
- log.info("Forcing failure after commit");
throw new TransactionException("Forced failure for testing");
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -900,10 +900,8 @@
if (q.isActive())
{
QueueStats stats = q.getStats();
-
- //We don't bother sending the stats if there's no significant change in the values
-
- if (q.changedSignificantly())
+
+ if (stats != null)
{
if (statsList == null)
{
@@ -988,14 +986,16 @@
RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
if (trace) { log.trace(this.nodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
+
+ localQueue.setPullQueue(toQueue);
- if (toQueue != null)
+ if (toQueue != null && localQueue.getRefCount() == 0)
{
- localQueue.setPullInfo(toQueue, pullSize);
+ //We now trigger delivery - this may cause a pull event
+ //We only do this if there are no refs in the local queue
- //We now trigger delivery - this may cause a pull event
localQueue.deliver(false);
-
+
if (trace) { log.trace(this.nodeId + " triggered delivery for " + localQueue.getName()); }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -49,13 +49,13 @@
ClusteredQueue chosenQueue = null;
int maxMessages = 0;
-
+
while (iter.hasNext())
{
ClusteredQueue queue = (ClusteredQueue)iter.next();
if (!queue.isLocal())
- {
+ {
QueueStats stats = queue.getStats();
if (stats != null)
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -38,21 +38,8 @@
*
* This router always favours the local queue.
*
- * If there is no local queue it will round robin between the others.
+ * If there is no local queue, then it will round robin between the non local queues.
*
- * In the case of a distributed point to point queue deployed at each node in the cluster
- * there will always be a local queue.
- *
- * In this case, with the assumption that producers and consumers are distributed evenly across the cluster
- * then sending the message to the local queue is the most efficient policy.
- *
- * The exception to this if there are no consumers on the local queue.
- *
- * In the case of a durable subscription, there may well be no local queue since the durable subscription lives
- * only on the number of nodes that it is looked up at.
- *
- * In this case the round robin routing will kick in
- *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
*
@@ -66,7 +53,7 @@
private boolean trace = log.isTraceEnabled();
//MUST be an arraylist for fast index access
- private ArrayList queues;
+ private ArrayList nonLocalQueues;
private ClusteredQueue localQueue;
@@ -74,12 +61,12 @@
public DefaultRouter()
{
- queues = new ArrayList();
+ nonLocalQueues = new ArrayList();
}
public int size()
{
- return queues.size();
+ return nonLocalQueues.size() + (localQueue == null ? 0 : 1);
}
public ClusteredQueue getLocalQueue()
@@ -99,17 +86,17 @@
}
localQueue = queue;
}
+ else
+ {
+ nonLocalQueues.add(queue);
+ }
- queues.add(queue);
-
- target = 0;
-
return true;
}
public void clear()
{
- queues.clear();
+ nonLocalQueues.clear();
localQueue = null;
@@ -118,31 +105,46 @@
public boolean contains(Receiver queue)
{
- return queues.contains(queue);
+ return localQueue == queue || nonLocalQueues.contains(queue);
}
public Iterator iterator()
{
+ List queues = new ArrayList();
+
+ if (localQueue != null)
+ {
+ queues.add(localQueue);
+ }
+
+ queues.addAll(nonLocalQueues);
+
return queues.iterator();
}
public boolean remove(Receiver queue)
{
- if (queues.remove(queue))
+ if (localQueue == queue)
{
- if (localQueue == queue)
- {
- localQueue = null;
- }
+ localQueue = null;
- target = 0;
-
return true;
}
else
{
- return false;
- }
+ if (nonLocalQueues.remove(queue))
+ {
+ if (target >= nonLocalQueues.size() - 1)
+ {
+ target = nonLocalQueues.size() - 1;
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
}
public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
@@ -150,8 +152,8 @@
if (trace) { log.trace(this + " routing ref " + reference); }
//Favour the local queue
-
- if (localQueue != null && localQueue.numberOfReceivers() > 0)
+
+ if (localQueue != null)
{
//The only time the local queue won't accept is if the selector doesn't
//match - in which case it won't match at any other nodes too so no point
@@ -165,21 +167,15 @@
}
else
{
- //There is no local shared queue or the local queue has no consumers
-
+ //There is no local shared queue
//We round robin among the rest
- if ((localQueue == null && !queues.isEmpty()) || (localQueue != null && queues.size() > 1))
+
+ if (!nonLocalQueues.isEmpty())
{
- ClusteredQueue queue = (ClusteredQueue)queues.get(target);
+ ClusteredQueue queue = (ClusteredQueue)nonLocalQueues.get(target);
- if (queue == localQueue)
- {
- //We don't want to choose the local queue
- incTarget();
- }
+ queue = (ClusteredQueue)nonLocalQueues.get(target);
- queue = (ClusteredQueue)queues.get(target);
-
Delivery del = queue.handle(observer, reference, tx);
if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }
@@ -200,7 +196,7 @@
{
target++;
- if (target == queues.size())
+ if (target == nonLocalQueues.size())
{
target = 0;
}
@@ -208,11 +204,13 @@
public List getQueues()
{
- return queues;
+ return nonLocalQueues;
}
public int numberOfReceivers()
{
- return queues.size();
+ return nonLocalQueues.size();
}
}
+
+
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.ListIterator;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
@@ -36,7 +37,9 @@
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.messaging.core.tx.TransactionRepository;
+import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.util.Future;
import org.jboss.messaging.util.StreamUtils;
@@ -62,19 +65,12 @@
private volatile int lastCount;
- private volatile boolean changedSignificantly;
+ private volatile RemoteQueueStub pullQueue;
- private RemoteQueueStub pullQueue;
-
private int nodeId;
- //TODO Make configurable
- private int pullSize;
-
private TransactionRepository tr;
- private Object pullLock = new Object();
-
//TODO - we shouldn't have to specify office AND nodeId
public LocalClusteredQueue(PostOffice office, int nodeId, String name, long id, MessageStore ms, PersistenceManager pm,
boolean acceptReliableMessages, boolean recoverable, QueuedExecutor executor,
@@ -87,7 +83,7 @@
this.tr = tr;
- //FIXME - this cast is a hack
+ //TODO - This cast is potentially unsafe - handle better
this.office = (PostOfficeInternal)office;
}
@@ -101,44 +97,39 @@
this.tr = tr;
- //FIXME - this cast is a hack
+ //TODO - This cast is potentially unsafe - handle better
this.office = (PostOfficeInternal)office;
}
- public void setPullInfo(RemoteQueueStub queue, int pullSize)
+ public void setPullQueue(RemoteQueueStub queue)
{
- synchronized (pullLock)
- {
- this.pullQueue = queue;
-
- this.pullSize = pullSize;
- }
+ this.pullQueue = queue;
}
-
+
public QueueStats getStats()
{
- int cnt = messageCount();
+ //Currently we only return the current message reference count for the channel
+ //Note we are only interested in the number of refs in the main queue, not
+ //in any deliveries
+ //Also we are only interested in the value obtained after delivery is complete.
+ //This is so we don't end up with transient values since delivery is half way through
+ int cnt = getRefCount();
+
if (cnt != lastCount)
{
- changedSignificantly = true;
+ lastCount = cnt;
- lastCount = cnt;
+ //We only return stats if it has changed since last time - this is so when we only
+ //broadcast data when necessary
+ return new QueueStats(name, cnt);
}
else
{
- changedSignificantly = false;
- }
-
- return new QueueStats(name, cnt);
+ return null;
+ }
}
-
- //Have the stats changed significantly since the last time we request them?
- public boolean changedSignificantly()
- {
- return changedSignificantly;
- }
-
+
public boolean isLocal()
{
return true;
@@ -148,44 +139,17 @@
{
return nodeId;
}
-
+
/*
* Used when pulling messages from a remote queue
*/
public List getDeliveries(int number) throws Exception
{
- List dels = new ArrayList();
+ Future result = new Future();
- synchronized (refLock)
- {
- synchronized (deliveryLock)
- {
- //We only get the refs if receiversReady = false so as not to steal messages that
- //might be consumed by local receivers
- if (!receiversReady)
- {
- int count = 0;
-
- MessageReference ref;
-
- while (count < number && (ref = removeFirstInMemory()) != null)
- {
- SimpleDelivery del = new SimpleDelivery(this, ref);
-
- deliveries.add(del);
-
- dels.add(del);
-
- count++;
- }
- return dels;
- }
- else
- {
- return Collections.EMPTY_LIST;
- }
- }
- }
+ this.executor.execute(new GetDeliveriesRunnable(result, 1));
+
+ return (List)result.getResult();
}
/*
@@ -225,39 +189,25 @@
acknowledgeInternal(d, null, false, false);
}
- protected void deliverInternal(boolean handle) throws Throwable
- {
- int beforeSize = -1;
+
+ protected MessageReference nextReference(ListIterator iter, boolean handle) throws Throwable
+ {
+ MessageReference ref = super.nextReference(iter, handle);
- if (!handle)
+ if (ref == null)
{
- beforeSize = messageRefs.size();
- }
-
- super.deliverInternal(handle);
-
- if (!handle)
- {
- int afterSize = messageRefs.size();
+ //There are no available refs in the local queue
+ //Maybe we need to pull one (some) from a remote queue?
- if (trace)
+ if (pullMessages())
{
- log.trace(this + " Deciding whether to pull messages. " +
- "receiversready:" + receiversReady + " before size:" + beforeSize + " afterSize: " + afterSize);
+ ref = super.nextReference(iter, handle);
}
-
- if (receiversReady && beforeSize == 0 && afterSize == 0)
- {
- //Delivery has been prompted (not from handle call)
- //and has run, and there are consumers that are still interested in receiving more
- //refs but there are none available in the channel (either the channel is empty
- //or there are only refs that don't match any selectors)
- //then we should perhaps pull some messages from a remote queue
- pullMessages();
- }
}
+
+ return ref;
}
-
+
public boolean isClustered()
{
return true;
@@ -286,28 +236,28 @@
* On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
* depending on whether they exist in the database
*
+ * Recovery is handled in the same way as CastMessagesCallback
+ *
* This method will always be executed on the channel's event queue (via the deliver method)
* so no need to do any handles or acks inside another event message
*/
- private void pullMessages() throws Throwable
- {
- RemoteQueueStub theQueue;
- int thePullSize;
-
- synchronized (pullLock)
+ private boolean pullMessages() throws Throwable
+ {
+ if (pullQueue == null)
{
- if (pullQueue == null)
- {
- return;
- }
- theQueue = pullQueue;
- thePullSize = pullSize;
+ return false;
}
-
+
+ //TODO we can optimise this for the case when only one message is pulled
+ //and when only non persistent messages are pulled - i.e. we don't need
+ //to create a transaction.
+
+ RemoteQueueStub theQueue = pullQueue;
+
Transaction tx = tr.createTransaction();
ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), theQueue.getChannelID(),
- name, thePullSize);
+ name, 1);
if (trace)
{
@@ -315,12 +265,14 @@
" pulling from node " + theQueue.getNodeId() + " to node " + this.nodeId);
}
+ log.info("==================== Executing pull messages request");
byte[] bytes = (byte[])office.syncSendRequest(req, theQueue.getNodeId(), true);
+ log.info("==================== Executed pull messages request");
if (bytes == null)
{
//Ok - node might have left the group
- return;
+ return false;
}
PullMessagesResponse response = new PullMessagesResponse();
@@ -346,20 +298,24 @@
containsReliable = true;
}
-
+
MessageReference ref = null;
try
{
ref = ms.reference(msg);
- Delivery delRet = handleInternal(null, ref, tx, true, true);
+ //It's ok to call this directly since this method is only ever called by the delivery thread
+ //We call it with the deliver parameter set to false - this prevents delivery being done
+ //after the ref is added - if delivery was done we would end up in recursion.
+ Delivery delRet = handleInternal(null, ref, tx, true, true, false);
if (delRet == null || !delRet.isSelectorAccepted())
{
//This should never happen
- throw new IllegalStateException("Aaarrgg queue did not accept reference");
+ throw new IllegalStateException("Queue did not accept reference!");
}
+
}
finally
{
@@ -391,7 +347,165 @@
req = new PullMessagesRequest(this.nodeId, tx.getId());
office.asyncSendRequest(req, theQueue.getNodeId());
+ }
+
+ return !msgs.isEmpty();
+ }
+
+ public int getRefCount()
+ {
+ //We are only interested in getting the reference count when delivery is not in progress
+ //since we don't want mid delivery transient spurious values, so we execute the request
+ //on the same thread.
+
+ Future result = new Future();
+
+ try
+ {
+ this.executor.execute(new GetRefCountRunnable(result));
}
+ catch (InterruptedException e)
+ {
+ log.warn("Thread interrupted", e);
+ }
+
+ return ((Integer)result.getResult()).intValue();
+ }
+
+ private class GetRefCountRunnable implements Runnable
+ {
+ Future result;
+ public GetRefCountRunnable(Future result)
+ {
+ this.result = result;
+ }
+
+ public void run()
+ {
+ int refCount = messageRefs.size();
+
+ result.setResult(new Integer(refCount));
+ }
+ }
+
+ private class GetDeliveriesRunnable implements Runnable
+ {
+ Future result;
+
+ int number;
+
+ public GetDeliveriesRunnable(Future result, int number)
+ {
+ this.result = result;
+
+ this.number = number;
+ }
+
+ public void run()
+ {
+ try
+ {
+ List list = null;
+
+ //We only get the refs if receiversReady = false so as not to steal messages that
+ //might be consumed by local receivers
+ if (!receiversReady)
+ {
+ int count = 0;
+
+ MessageReference ref;
+
+ list = new ArrayList();
+
+ synchronized (refLock)
+ {
+ synchronized (deliveryLock)
+ {
+ while (count < number && (ref = removeFirstInMemory()) != null)
+ {
+ SimpleDelivery del = new SimpleDelivery(LocalClusteredQueue.this, ref);
+
+ deliveries.add(del);
+
+ list.add(del);
+
+ count++;
+ }
+ }
+ }
+ }
+ else
+ {
+ list = Collections.EMPTY_LIST;
+ }
+
+ result.setResult(list);
+ }
+ catch (Exception e)
+ {
+ result.setException(e);
+ }
+ }
+ }
+
+ private class AddReferencesCallback implements TxCallback
+ {
+ private List references;
+
+ private AddReferencesCallback(List references)
+ {
+ this.references = references;
+ }
+
+ public void afterCommit(boolean onePhase) throws Exception
+ {
+ Iterator iter = references.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = (MessageReference) iter.next();
+
+ if (trace) { log.trace(this + ": adding " + ref + " to non-recoverable state"); }
+
+ try
+ {
+ synchronized (refLock)
+ {
+ addReferenceInMemory(ref);
+ }
+ }
+ catch (Throwable t)
+ {
+ throw new TransactionException("Failed to add reference", t);
+ }
+ }
+ }
+
+ public void afterPrepare() throws Exception
+ {
+ //NOOP
+ }
+
+ public void afterRollback(boolean onePhase) throws Exception
+ {
+ //NOOP
+ }
+
+ public void beforeCommit(boolean onePhase) throws Exception
+ {
+ //NOOP
+ }
+
+ public void beforePrepare() throws Exception
+ {
+ //NOOP
+ }
+
+ public void beforeRollback(boolean onePhase) throws Exception
+ {
+ //NOOP
+ }
+
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -69,6 +69,4 @@
void sendQueueStats() throws Exception;
boolean referenceExistsInStorage(long channelID, long messageID) throws Exception;
-
- List getDeliveries(String queueName, int numMessages) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -29,6 +29,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.util.StreamUtils;
/**
@@ -80,8 +81,17 @@
if (hold)
{
- List dels = office.getDeliveries(queueName, numMessages);
+ Binding binding = office.getBindingForQueueName(queueName);
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding for queue: " + queueName);
+ }
+
+ LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
+
+ List dels = queue.getDeliveries(numMessages);
+
if (trace) { log.trace("PullMessagesRequest got " + dels.size() + " deliveries"); }
PullMessagesResponse response = new PullMessagesResponse(dels.size());
@@ -97,7 +107,7 @@
//Add it to internal list
if (reliableDels == null)
{
- reliableDels = new ArrayList();
+ reliableDels = new ArrayList();
}
reliableDels.add(del);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -98,6 +98,7 @@
class SendStatsTimerTask extends TimerTask
{
private boolean stopping;
+
private boolean stopped;
private Object stopLock = new Object();
@@ -125,8 +126,11 @@
if (stopping)
{
cancel();
+
stopped = true;
+
stopLock.notify();
+
return;
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -61,6 +61,7 @@
public static final String BROKEN = "BROKEN";
public static final String REJECTING = "REJECTING";
public static final String SELECTOR_REJECTING = "SELECTOR_REJECTING";
+ public static final String ACCEPTING_TO_MAX = "ACCEPTING_TO_MAX";
private static final String INVOCATION_COUNT = "INVOCATION_COUNT";
@@ -77,6 +78,7 @@
private int invocationsToFutureStateCount;
private Map waitingArea;
private boolean immediateAsynchronousAcknowledgment;
+ private int maxRefs;
// Constructors --------------------------------------------------
@@ -102,6 +104,7 @@
{
this(name, state, null);
}
+
public SimpleReceiver(String name, String state, Channel channel)
{
@@ -142,8 +145,16 @@
log.trace(this + " is rejecting reference " + ref);
return null;
}
+
+ if (ACCEPTING_TO_MAX.equals(state))
+ {
+ //Only accept up to maxRefs references
+ if (messages.size() == maxRefs)
+ {
+ return null;
+ }
+ }
-
if (BROKEN.equals(state))
{
throw new RuntimeException("THIS IS AN EXCEPTION THAT SIMULATES "+
@@ -192,6 +203,11 @@
}
// Public --------------------------------------------------------
+
+ public void setMaxRefs(int max)
+ {
+ this.maxRefs = max;
+ }
public void setImmediateAsynchronousAcknowledgment(boolean b)
{
@@ -392,7 +408,8 @@
!ACCEPTING.equals(state) &&
!BROKEN.equals(state) &&
!REJECTING.equals(state) &&
- !SELECTOR_REJECTING.equals(state))
+ !SELECTOR_REJECTING.equals(state) &&
+ !ACCEPTING_TO_MAX.equals(state))
{
throw new IllegalArgumentException("Unknown receiver state: " + state);
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -95,18 +95,6 @@
local(false);
}
-
- public void testLocalNonConsumersPersistent() throws Throwable
- {
- localNoConsumers(true);
- }
-
- public void testLocalNoConsumersNonPersistent() throws Throwable
- {
- localNoConsumers(false);
- }
-
-
protected void notLocal(boolean persistent) throws Throwable
{
ClusteredPostOffice office1 = null;
@@ -245,150 +233,9 @@
}
}
- //if the local queue has no consumers then we treat as if there was no local queue
- protected void localNoConsumers(boolean persistent) throws Throwable
- {
- ClusteredPostOffice office1 = null;
-
- ClusteredPostOffice office2 = null;
-
- ClusteredPostOffice office3 = null;
-
- ClusteredPostOffice office4 = null;
-
- ClusteredPostOffice office5 = null;
-
- ClusteredPostOffice office6 = null;
-
- try
- {
- office1 = createClusteredPostOffice(1, "testgroup");
-
- office2 = createClusteredPostOffice(2, "testgroup");
-
- office3 = createClusteredPostOffice(3, "testgroup");
-
- office4 = createClusteredPostOffice(4, "testgroup");
-
- office5 = createClusteredPostOffice(5, "testgroup");
-
- office6 = createClusteredPostOffice(6, "testgroup");
-
- LocalClusteredQueue queueLocal = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding bindingLocal = office1.bindClusteredQueue("topic", queueLocal);
- //No consumer on the local one
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding1 = office2.bindClusteredQueue("topic", queue1);
- SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue1.add(receiver1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding2 = office3.bindClusteredQueue("topic", queue2);
- SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue2.add(receiver2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding3 = office4.bindClusteredQueue("topic", queue3);
- SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue3.add(receiver3);
-
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding4 = office5.bindClusteredQueue("topic", queue4);
- SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue4.add(receiver4);
-
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
- Binding binding5 = office6.bindClusteredQueue("topic", queue5);
- SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
- queue5.add(receiver5);
-
- List msgs = sendMessages("topic", persistent, office1, 1, null);
- checkContainsAndAcknowledge(msgs, receiver1, queue1);
- checkEmpty(receiver2);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkEmpty(receiver1);
- checkContainsAndAcknowledge(msgs, receiver2, queue1);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkEmpty(receiver1);
- checkEmpty(receiver2);
- checkContainsAndAcknowledge(msgs, receiver3, queue1);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkEmpty(receiver1);
- checkEmpty(receiver2);
- checkEmpty(receiver3);
- checkContainsAndAcknowledge(msgs, receiver4, queue1);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkEmpty(receiver1);
- checkEmpty(receiver2);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkContainsAndAcknowledge(msgs, receiver5, queue1);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkContainsAndAcknowledge(msgs, receiver1, queue1);
- checkEmpty(receiver2);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
- msgs = sendMessages("topic", persistent, office1, 1, null);
- checkEmpty(receiver1);
- checkContainsAndAcknowledge(msgs, receiver2, queue1);
- checkEmpty(receiver3);
- checkEmpty(receiver4);
- checkEmpty(receiver5);
-
-
- }
- finally
- {
- if (office1 != null)
- {
- office1.stop();
- }
-
- if (office2 != null)
- {
- office2.stop();
- }
-
- if (office3 != null)
- {
- office3.stop();
- }
-
- if (office4 != null)
- {
- office4.stop();
- }
-
- if (office5 != null)
- {
- office5.stop();
- }
-
- if (office6 != null)
- {
- office6.stop();
- }
- }
- }
+
protected void local(boolean persistent) throws Throwable
{
ClusteredPostOffice office1 = null;
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -21,6 +21,7 @@
*/
package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -30,6 +31,7 @@
import org.jboss.messaging.core.FilterFactory;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.Receiver;
import org.jboss.messaging.core.SimpleDelivery;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
@@ -85,8 +87,8 @@
super.tearDown();
}
- // The router only has a local queue with a consumer
- public void testRouterOnlyLocalWithConsumer() throws Exception
+ // The router only has a local queue
+ public void testRouterOnlyLocal() throws Exception
{
DefaultRouter dr = new DefaultRouter();
@@ -105,25 +107,6 @@
sendAndCheck(dr, receiver1);
}
- //The router only has a local queue with no consumer
- public void testRouterOnlyLocalNoConsumer() throws Exception
- {
- DefaultRouter dr = new DefaultRouter();
-
- ClusteredQueue queue = new SimpleQueue(true);
-
- dr.add(queue);
-
- Message msg = CoreMessageFactory.createCoreMessage(0, false, null);
-
- MessageReference ref = ms.reference(msg);
-
- Delivery del = dr.handle(null, ref, null);
-
- assertNull(del);
-
- }
-
//The router has only one non local queues
public void testRouterOnlyOneNonLocal() throws Exception
{
@@ -189,8 +172,8 @@
}
- // The router has one local with consumer and one non local queue
- public void testRouterOneLocalWithConsumerOneNonLocal() throws Exception
+ // The router has one local with consumer and one non local queue with consumer
+ public void testRouterOneLocalOneNonLocal() throws Exception
{
DefaultRouter dr = new DefaultRouter();
@@ -217,8 +200,8 @@
sendAndCheck(dr, receiver2);
}
- // The router has multiple non local queues and one local queue with consumer
- public void testRouterMultipleNonLocalOneLocalNoConsumer() throws Exception
+ // The router has multiple non local queues with consumers and one local queue
+ public void testRouterMultipleNonLocalOneLocal() throws Exception
{
DefaultRouter dr = new DefaultRouter();
@@ -265,85 +248,37 @@
sendAndCheck(dr, receiver4);
}
- // The router has multiple non local queues and one local queue without consumer
- public void testRouterMultipleNonLocalOneLocalWithConsumer() throws Exception
+ private long nextId;
+
+ private void sendAndCheck(ClusterRouter router, SimpleReceiver receiver) throws Exception
{
- DefaultRouter dr = new DefaultRouter();
-
- ClusteredQueue remote1 = new SimpleQueue(false);
+ Message msg = CoreMessageFactory.createCoreMessage(nextId++, false, null);
- SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ MessageReference ref = ms.reference(msg);
- remote1.add(receiver1);
+ Delivery del = router.handle(null, ref, null);
- dr.add(remote1);
+ assertNotNull(del);
+ assertTrue(del.isSelectorAccepted());
+
+ Thread.sleep(250);
- ClusteredQueue remote2 = new SimpleQueue(false);
+ List msgs = receiver.getMessages();
- SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ assertNotNull(msgs);
- remote2.add(receiver2);
+ assertEquals(1, msgs.size());
- dr.add(remote2);
+ Message msgRec = (Message)msgs.get(0);
+ assertTrue(msg == msgRec);
- ClusteredQueue remote3 = new SimpleQueue(false);
-
- SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-
- remote3.add(receiver3);
-
- dr.add(remote3);
-
-
- ClusteredQueue queue = new SimpleQueue(true);
-
-
- dr.add(queue);
-
-
- sendAndCheck(dr, receiver1);
-
- sendAndCheck(dr, receiver2);
-
- sendAndCheck(dr, receiver3);
-
- sendAndCheck(dr, receiver1);
-
- sendAndCheck(dr, receiver2);
-
- sendAndCheck(dr, receiver3);
+ receiver.clear();
}
- // The router has one local without consumer and one non local queue
- public void testRouterMultipleOneLocalWithoutConsumerOneNonLocal() throws Exception
+ private void sendAndCheck(ClusterRouter router, Queue queue) throws Throwable
{
- DefaultRouter dr = new DefaultRouter();
-
- ClusteredQueue remote1 = new SimpleQueue(false);
-
- SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-
- remote1.add(receiver1);
-
- dr.add(remote1);
-
- ClusteredQueue queue = new SimpleQueue(true);
-
- dr.add(queue);
-
- sendAndCheck(dr, receiver1);
-
- sendAndCheck(dr, receiver1);
-
- sendAndCheck(dr, receiver1);
- }
-
- private long nextId;
-
- private void sendAndCheck(ClusterRouter router, SimpleReceiver receiver) throws Exception
- {
Message msg = CoreMessageFactory.createCoreMessage(nextId++, false, null);
MessageReference ref = ms.reference(msg);
@@ -356,7 +291,7 @@
Thread.sleep(250);
- List msgs = receiver.getMessages();
+ List msgs = queue.browse();
assertNotNull(msgs);
@@ -366,7 +301,7 @@
assertTrue(msg == msgRec);
- receiver.clear();
+ queue.removeAllReferences();
}
@@ -402,6 +337,8 @@
private boolean local;
private Receiver receiver;
+
+ private List refs = new ArrayList();
SimpleQueue(boolean local)
{
@@ -457,8 +394,18 @@
public List browse()
{
- // TODO Auto-generated method stub
- return null;
+ List msgs = new ArrayList();
+
+ Iterator iter = refs.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = (MessageReference)iter.next();
+
+ msgs.add(ref);
+ }
+
+ return msgs;
}
public List browse(Filter filter)
@@ -549,12 +496,21 @@
{
if (receiver != null)
{
+ //Send to receiver
+
Delivery del = receiver.handle(observer, reference, tx);
return del;
}
+ else
+ {
+ //Store internally
+ refs.add(reference);
+
+ return new SimpleDelivery(observer, reference);
+ }
- return new SimpleDelivery(observer, reference);
+
}
public void acknowledge(Delivery d, Transaction tx) throws Throwable
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -86,12 +86,16 @@
DefaultClusteredPostOffice office2 = null;
+ DefaultClusteredPostOffice office3 = null;
+
try
{
office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+ office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+
LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 =
office1.bindClusteredQueue("topic1", queue1);
@@ -100,10 +104,16 @@
Binding binding2 =
office2.bindClusteredQueue("topic1", queue2);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 =
+ office3.bindClusteredQueue("topic1", queue3);
+
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.add(receiver3);
//This will make it fail after casting but before persisting the message in the db
office1.setFail(true, false);
@@ -130,6 +140,9 @@
msgs = receiver2.getMessages();
assertTrue(msgs.isEmpty());
+ msgs = receiver3.getMessages();
+ assertTrue(msgs.isEmpty());
+
try
{
//An exception should be thrown
@@ -149,7 +162,16 @@
msgs = receiver2.getMessages();
assertTrue(msgs.isEmpty());
- //We now kill the office - this should make the other office do it's transaction check
+ msgs = receiver3.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ assertEquals(1, office1.getHoldingTransactions().size());
+
+ assertEquals(1, office2.getHoldingTransactions().size());
+
+ assertEquals(1, office3.getHoldingTransactions().size());
+
+ //We now kill the office - this should make the other offices do their transaction check
office1.stop();
Thread.sleep(1000);
@@ -158,12 +180,17 @@
assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+
//The tx should be removed from the holding area and nothing should be received
//remember node1 has now crashed so no point checking receiver1
msgs = receiver2.getMessages();
assertTrue(msgs.isEmpty());
+ msgs = receiver3.getMessages();
+ assertTrue(msgs.isEmpty());
+
}
finally
{
@@ -176,6 +203,11 @@
{
office2.stop();
}
+
+ if (office3!= null)
+ {
+ office3.stop();
+ }
}
}
@@ -185,12 +217,16 @@
DefaultClusteredPostOffice office2 = null;
+ DefaultClusteredPostOffice office3 = null;
+
try
{
office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+ office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+
LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
Binding binding1 =
office1.bindClusteredQueue("topic1", queue1);
@@ -199,10 +235,16 @@
Binding binding2 =
office2.bindClusteredQueue("topic1", queue2);
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 =
+ office3.bindClusteredQueue("topic1", queue3);
+
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.add(receiver3);
//This will make it fail after casting and persisting the message in the db
office1.setFail(false, true);
@@ -248,6 +290,15 @@
msgs = receiver2.getMessages();
assertTrue(msgs.isEmpty());
+ msgs = receiver3.getMessages();
+ assertTrue(msgs.isEmpty());
+
+ assertEquals(1, office1.getHoldingTransactions().size());
+
+ assertEquals(1, office2.getHoldingTransactions().size());
+
+ assertEquals(1, office3.getHoldingTransactions().size());
+
//We now kill the office - this should make the other office do it's transaction check
office1.stop();
@@ -257,12 +308,17 @@
assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+
//The tx should be removed from the holding area and messages be received
//no point checking receiver1 since node1 has crashed
msgs = receiver2.getMessages();
assertEquals(NUM_MESSAGES, msgs.size());
+ msgs = receiver3.getMessages();
+ assertEquals(NUM_MESSAGES, msgs.size());
+
}
finally
{
Deleted: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -1,422 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.core.plugin.postoffice.cluster;
-
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Receiver;
-import org.jboss.messaging.core.SimpleDelivery;
-import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.Binding;
-import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultMessagePullPolicy;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
-import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
-import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.test.messaging.core.SimpleFilterFactory;
-import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
-
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
-
-public class RedistributionTest extends ClusteringTestBase
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public RedistributionTest(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- public void setUp() throws Exception
- {
- super.setUp();
- }
-
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public void testRedistNonPersistentNonRecoverable() throws Throwable
- {
- redistTest(false, false);
- }
-
- public void testRedistPersistentNonRecoverable() throws Throwable
- {
- redistTest(true, false);
- }
-
- public void testRedistNonPersistentRecoverable() throws Throwable
- {
- redistTest(false, true);
- }
-
- public void testRedistPersistentRecoverable() throws Throwable
- {
- redistTest(true, true);
- }
-
- public void redistTest(boolean persistent, boolean recoverable) throws Throwable
- {
- ClusteredPostOffice office1 = null;
-
- ClusteredPostOffice office2 = null;
-
- ClusteredPostOffice office3 = null;
-
- ClusteredPostOffice office4 = null;
-
- ClusteredPostOffice office5 = null;
-
- try
- {
- office1 = createClusteredPostOffice(1, "testgroup");
-
- office2 = createClusteredPostOffice(2, "testgroup");
-
- office3 = createClusteredPostOffice(3, "testgroup");
-
- office4 = createClusteredPostOffice(4, "testgroup");
-
- office5 = createClusteredPostOffice(5, "testgroup");
-
- log.info("Started offices");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
-
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
-
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
-
- log.info("bound queues");
-
- //Send 30 messages to each queue
- this.sendMessages("queue1", persistent, office1, 30, null);
- this.sendMessages("queue1", persistent, office2, 30, null);
- this.sendMessages("queue1", persistent, office3, 30, null);
- this.sendMessages("queue1", persistent, office4, 30, null);
- this.sendMessages("queue1", persistent, office5, 30, null);
-
- log.info("sent messages");
-
- Thread.sleep(1000);
-
- //Check the sizes
-
- assertEquals(30, queue1.memoryRefCount());
- assertEquals(0, queue1.memoryDeliveryCount());
-
- assertEquals(30, queue2.memoryRefCount());
- assertEquals(0, queue2.memoryDeliveryCount());
-
- assertEquals(30, queue3.memoryRefCount());
- assertEquals(0, queue3.memoryDeliveryCount());
-
- assertEquals(30, queue4.memoryRefCount());
- assertEquals(0, queue4.memoryDeliveryCount());
-
- assertEquals(30, queue5.memoryRefCount());
- assertEquals(0, queue5.memoryDeliveryCount());
-
- //Now we add the receivers
- //Note that we did not do this before the send.
- //If we had done so then it's likely that the automatic redistribution
- //would have moved some around and there wouldn't be 30 in each queue
-
- PullingReceiver receiver1 = new PullingReceiver();
- queue1.add(receiver1);
-
- PullingReceiver receiver2 = new PullingReceiver();
- queue2.add(receiver2);
-
- PullingReceiver receiver3 = new PullingReceiver();
- queue3.add(receiver3);
-
- PullingReceiver receiver4 = new PullingReceiver();
- queue4.add(receiver4);
-
- PullingReceiver receiver5 = new PullingReceiver();
- queue5.add(receiver5);
-
- log.info("Added receivers");
-
- //Prompt delivery so a message pops into each receiver
- queue1.deliver(true);
- queue2.deliver(true);
- queue3.deliver(true);
- queue4.deliver(true);
- queue5.deliver(true);
-
- Thread.sleep(1000);
-
- //Now we check the sizes again in case automatic balancing has erroneously
- //kicked in
-
- assertEquals(29, queue1.memoryRefCount());
- assertEquals(1, queue1.memoryDeliveryCount());
-
- assertEquals(29, queue2.memoryRefCount());
- assertEquals(1, queue2.memoryDeliveryCount());
-
- assertEquals(29, queue3.memoryRefCount());
- assertEquals(1, queue3.memoryDeliveryCount());
-
- assertEquals(29, queue4.memoryRefCount());
- assertEquals(1, queue4.memoryDeliveryCount());
-
- assertEquals(29, queue5.memoryRefCount());
- assertEquals(1, queue5.memoryDeliveryCount());
-
- Thread.sleep(5000);
-
- //And again - should still be no redistribution
-
- assertEquals(29, queue1.memoryRefCount());
- assertEquals(1, queue1.memoryDeliveryCount());
-
- assertEquals(29, queue2.memoryRefCount());
- assertEquals(1, queue2.memoryDeliveryCount());
-
- assertEquals(29, queue3.memoryRefCount());
- assertEquals(1, queue3.memoryDeliveryCount());
-
- assertEquals(29, queue4.memoryRefCount());
- assertEquals(1, queue4.memoryDeliveryCount());
-
- assertEquals(29, queue5.memoryRefCount());
- assertEquals(1, queue5.memoryDeliveryCount());
-
- Thread.sleep(2000);
-
- log.info("Here are the sizes:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- log.info("trying to consume");
-
- //So we have 150 messages in total - 30 on each node.
-
- //If redistribution works ok, we should be able to do something like the following:
-
- //Consume 10 on node 1
-
- //Consume 50 on node 2
-
- //Consume 75 on node 3
-
- //Consume 10 on node 4
-
- //We leave the last 5 since they will be as deliveries in the receivers probably
-
- Delivery del;
-
- log.info("consuming queue1");
- for (int i = 0; i < 10; i++)
- {
- queue1.deliver(true);
- del = receiver1.getDelivery();
- log.info("Got delivery: " + del.getReference().getMessageID());
- del.acknowledge(null);
- }
- log.info("consumed queue1");
-
- log.info("Here are the sizes:");
-
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- log.info("consuming queue2");
- for (int i = 0; i < 50; i++)
- {
- queue2.deliver(true);
- del = receiver2.getDelivery();
- log.info("Got delivery: " + del.getReference().getMessageID());
- del.acknowledge(null);
- }
- log.info("consumed queue2");
-
- log.info("Here are the sizes:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- log.info("consuming queue3");
- for (int i = 0; i < 75; i++)
- {
- queue3.deliver(true);
- del = receiver3.getDelivery();
- log.info("Got delivery: " + del.getReference().getMessageID());
- del.acknowledge(null);
- }
- log.info("consumed queue3");
-
- log.info("Here are the sizes:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- log.info("consuming queue4");
- for (int i = 0; i < 10; i++)
- {
- queue4.deliver(true);
- del = receiver4.getDelivery();
- log.info("Got delivery: " + del.getReference().getMessageID());
- del.acknowledge(null);
- }
- log.info("consumed queue4");
-
- log.info("Here are the sizes:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- }
- finally
- {
- if (office1 != null)
- {
- office1.stop();
- }
-
- if (office2 != null)
- {
- office2.stop();
- }
-
- if (office3 != null)
- {
- office3.stop();
- }
-
- if (office4 != null)
- {
- office4.stop();
- }
-
- if (office5 != null)
- {
- office5.stop();
- }
- }
- }
-
- class PullingReceiver implements Receiver
- {
- private Delivery del;
-
- public synchronized Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
- {
- if (del != null)
- {
- return null;
- }
-
- del = new SimpleDelivery(observer, reference, false);
-
- this.notify();
-
- return del;
- }
-
- public synchronized Delivery getDelivery()
- {
- while (del == null)
- {
- try
- {
- this.wait();
- }
- catch (InterruptedException e)
- {
- }
- }
- Delivery ret = del;
- del = null;
- return ret;
- }
-
- }
-
- protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
- {
- MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
-
- FilterFactory ff = new SimpleFilterFactory();
-
- ClusterRouterFactory rf = new DefaultRouterFactory();
-
- DefaultClusteredPostOffice postOffice =
- new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
- groupName,
- JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties(),
- 5000, 5000, pullPolicy, rf, 1, 1000);
-
- postOffice.start();
-
- return postOffice;
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
-
-
-
Modified: trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java 2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java 2006-10-13 12:09:35 UTC (rev 1473)
@@ -100,8 +100,7 @@
{
ServerManagement.undeployQueue("Queue");
- connection.stop();
- connection = null;
+ connection.close();
super.tearDown();
}
More information about the jboss-cvs-commits
mailing list