[jboss-cvs] JBoss Messaging SVN: r5146 - in trunk: src/main/org/jboss/messaging/core/server/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Oct 19 04:48:49 EDT 2008
Author: timfox
Date: 2008-10-19 04:48:48 -0400 (Sun, 19 Oct 2008)
New Revision: 5146
Modified:
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
Fixed queue delivery tests
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-19 08:38:07 UTC (rev 5145)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-19 08:48:48 UTC (rev 5146)
@@ -57,9 +57,7 @@
* @param list
*/
void addListFirst(LinkedList<MessageReference> list);
-
- void deliver();
-
+
void deliverAsync(Executor executor);
void addConsumer(Consumer consumer);
@@ -144,4 +142,7 @@
MessageReference removeFirst();
boolean consumerFailedOver();
+
+ //Only used in testing
+ void deliverNow();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-19 08:38:07 UTC (rev 5145)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-19 08:48:48 UTC (rev 5146)
@@ -195,84 +195,14 @@
executor.execute(deliverRunner);
}
}
-
- /*
- * Attempt to deliver all the messages in the queue
- */
- public void deliver()
+
+ // Only used in testing - do not call directly!
+ public synchronized void deliverNow()
{
- // We don't do actual delivery if the queue is on a backup node - this is
- // because it's async and could get out of step
- // with the live node. Instead, when we replicate the delivery we remove
- // the ref from the queue
-
- if (backup)
- {
- return ;
- }
-
- MessageReference reference;
-
- Iterator<MessageReference> iterator = null;
-
- while (true)
- {
- if (iterator == null)
- {
- reference = messageReferences.peekFirst();
- }
- else
- {
- if (iterator.hasNext())
- {
- reference = iterator.next();
- }
- else
- {
- reference = null;
- }
- }
-
- if (reference == null)
- {
- if (iterator == null)
- {
- // We delivered all the messages - go into direct delivery
- direct = true;
-
- promptDelivery = false;
- }
- return;
- }
-
- HandleStatus status = deliver(reference);
-
- if (status == HandleStatus.HANDLED)
- {
- if (iterator == null)
- {
- messageReferences.removeFirst();
- }
- else
- {
- iterator.remove();
- }
- }
- else if (status == HandleStatus.BUSY)
- {
- // All consumers busy - give up
- break;
- }
- else if (status == HandleStatus.NO_MATCH && iterator == null)
- {
- // Consumers not all busy - but filter not accepting - iterate
- // back
- // through the queue
- iterator = messageReferences.iterator();
- }
- }
+ deliver();
}
+
public synchronized void addConsumer(final Consumer consumer)
{
distributionPolicy.addConsumer(consumer);
@@ -668,6 +598,83 @@
// Private
// ------------------------------------------------------------------------------
+ /*
+ * Attempt to deliver all the messages in the queue
+ */
+ private void deliver()
+ {
+ // We don't do actual delivery if the queue is on a backup node - this is
+ // because it's async and could get out of step
+ // with the live node. Instead, when we replicate the delivery we remove
+ // the ref from the queue
+
+ if (backup)
+ {
+ return;
+ }
+
+ MessageReference reference;
+
+ Iterator<MessageReference> iterator = null;
+
+ while (true)
+ {
+ if (iterator == null)
+ {
+ reference = messageReferences.peekFirst();
+ }
+ else
+ {
+ if (iterator.hasNext())
+ {
+ reference = iterator.next();
+ }
+ else
+ {
+ reference = null;
+ }
+ }
+
+ if (reference == null)
+ {
+ if (iterator == null)
+ {
+ // We delivered all the messages - go into direct delivery
+ direct = true;
+
+ promptDelivery = false;
+ }
+ return;
+ }
+
+ HandleStatus status = deliver(reference);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ if (iterator == null)
+ {
+ messageReferences.removeFirst();
+ }
+ else
+ {
+ iterator.remove();
+ }
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ // All consumers busy - give up
+ break;
+ }
+ else if (status == HandleStatus.NO_MATCH && iterator == null)
+ {
+ // Consumers not all busy - but filter not accepting - iterate
+ // back
+ // through the queue
+ iterator = messageReferences.iterator();
+ }
+ }
+ }
+
private synchronized HandleStatus add(final MessageReference ref, final boolean first)
{
if (!first)
Modified: trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java 2008-10-19 08:38:07 UTC (rev 5145)
+++ trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java 2008-10-19 08:48:48 UTC (rev 5146)
@@ -79,7 +79,7 @@
consumer.setStatusImmediate(HandleStatus.HANDLED);
- queue.deliver();
+ queue.deliverNow();
if (sender.getException() != null)
{
@@ -196,7 +196,7 @@
{
consumer.setStatusImmediate(HandleStatus.HANDLED);
- queue.deliver();
+ queue.deliverNow();
}
toggle = !toggle;
Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java 2008-10-19 08:38:07 UTC (rev 5145)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java 2008-10-19 08:48:48 UTC (rev 5146)
@@ -141,7 +141,7 @@
queue.addConsumer(consumer);
- queue.deliver();
+ queue.deliverNow();
assertRefListsIdenticalRefs(refs, consumer.getReferences());
}
@@ -201,7 +201,7 @@
queue.addConsumer(consumer);
- queue.deliver();
+ queue.deliverNow();
}
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -328,7 +328,7 @@
queue.addConsumer(consumer);
- queue.deliver();
+ queue.deliverNow();
assertTrue(consumer.getReferences().isEmpty());
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-10-19 08:38:07 UTC (rev 5145)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-10-19 08:48:48 UTC (rev 5146)
@@ -273,7 +273,7 @@
assertEquals(10, queue.getMessageCount());
assertEquals(0, queue.getScheduledCount());
- queue.deliver();
+ queue.deliverNow();
assertRefListsIdenticalRefs(refs, consumer.getReferences());
assertEquals(numMessages, queue.getMessageCount());
@@ -308,7 +308,7 @@
assertEquals(0, queue.getScheduledCount());
assertEquals(0, queue.getDeliveringCount());
- queue.deliver();
+ queue.deliverNow();
assertEquals(10, queue.getMessageCount());
assertEquals(0, queue.getScheduledCount());
@@ -317,7 +317,7 @@
consumer.setStatusImmediate(HandleStatus.HANDLED);
- queue.deliver();
+ queue.deliverNow();
assertRefListsIdenticalRefs(refs, consumer.getReferences());
assertEquals(10, queue.getMessageCount());
@@ -352,7 +352,7 @@
assertEquals(0, queue.getScheduledCount());
assertEquals(0, queue.getDeliveringCount());
- queue.deliver();
+ queue.deliverNow();
assertEquals(10, queue.getMessageCount());
assertEquals(0, queue.getScheduledCount());
@@ -384,7 +384,7 @@
queue.addLast(ref);
}
- queue.deliver();
+ queue.deliverNow();
assertRefListsIdenticalRefs(refs, consumer.getReferences());
assertEquals(30, queue.getMessageCount());
@@ -435,7 +435,7 @@
queue.addConsumer(consumer);
- queue.deliver();
+ queue.deliverNow();
List<MessageReference> allRefs = new ArrayList<MessageReference>();
@@ -472,7 +472,7 @@
queue.addConsumer(cons1);
- queue.deliver();
+ queue.deliverNow();
assertEquals(numMessages, queue.getMessageCount());
assertEquals(0, queue.getScheduledCount());
@@ -658,7 +658,7 @@
queue.addConsumer(cons2);
- queue.deliver();
+ queue.deliverNow();
assertEquals(numMessages / 2, cons1.getReferences().size());
@@ -692,7 +692,7 @@
queue.addConsumer(cons2);
- queue.deliver();
+ queue.deliverNow();
for (int i = 0; i < numMessages; i++)
{
@@ -796,7 +796,7 @@
queue.addConsumer(consumer);
- queue.deliver();
+ queue.deliverNow();
assertTrue(consumer.getReferences().isEmpty());
}
@@ -824,7 +824,7 @@
queue.addConsumer(consumer);
- queue.deliver();
+ queue.deliverNow();
List<MessageReference> receivedRefs = consumer.getReferences();
@@ -978,7 +978,7 @@
queue.addConsumer(consumer);
- queue.deliver();
+ queue.deliverNow();
refs.clear();
@@ -1069,7 +1069,7 @@
{
queue.addConsumer(consumer);
- queue.deliver();
+ queue.deliverNow();
}
assertEquals(6, queue.getMessageCount());
@@ -1089,7 +1089,7 @@
queue.addConsumer(consumer);
- queue.deliver();
+ queue.deliverNow();
assertEquals(4, queue.getMessageCount());
@@ -1113,7 +1113,7 @@
EasyMock.expect(consumer.handle(messageReference2)).andReturn(HandleStatus.HANDLED);
EasyMock.replay(consumer);
queue.addConsumer(consumer);
- queue.deliver();
+ queue.deliverNow();
EasyMock.verify(consumer);
}
@@ -1167,7 +1167,7 @@
queue.addListFirst(messageReferences);
queue.removeReferenceWithID(2);
queue.addConsumer(consumer);
- queue.deliver();
+ queue.deliverNow();
EasyMock.verify(consumer);
}
More information about the jboss-cvs-commits
mailing list