[hornetq-commits] JBoss hornetq SVN: r8661 - in trunk: src/main/org/hornetq/core/server/impl and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 10 04:44:05 EST 2009
Author: timfox
Date: 2009-12-10 04:44:05 -0500 (Thu, 10 Dec 2009)
New Revision: 8661
Modified:
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
simpified receive immediate logic on the server
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -120,8 +120,6 @@
boolean hasMatchingConsumer(ServerMessage message);
- void deliverNow();
-
Collection<Consumer> getConsumers();
boolean checkDLQ(MessageReference ref) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -68,7 +68,7 @@
void close() throws Exception;
- void promptDelivery(Queue queue, boolean async);
+ void promptDelivery(Queue queue);
void handleAcknowledge(final SessionAcknowledgeMessage packet);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -337,20 +337,19 @@
* When the consumer receives such a "forced delivery" message, it discards it
* and knows that there are no other messages to be delivered.
*/
-
- // TODO - why is this executed on a different thread?
public synchronized void forceDelivery(final long sequence)
{
+ promptDelivery();
+
executor.execute(new Runnable()
{
public void run()
{
try
{
- // The prompt delivery is called synchronously to ensure the "forced delivery" message is
- // sent after any queue delivery.
- promptDelivery(false);
-
+ // We execute this on the same executor to make sure the force delivery message is written after
+ // any delivery is completed
+
ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
@@ -413,7 +412,7 @@
// Outside the lock
if (started)
{
- promptDelivery(true);
+ promptDelivery();
}
}
@@ -465,7 +464,7 @@
if (!transferring)
{
- promptDelivery(true);
+ promptDelivery();
}
}
@@ -491,7 +490,7 @@
if (previous <= 0 && previous + credits > 0)
{
- promptDelivery(true);
+ promptDelivery();
}
}
}
@@ -580,7 +579,7 @@
// Private --------------------------------------------------------------------------------------
- private void promptDelivery(final boolean asyncDelivery)
+ private void promptDelivery()
{
lock.lock();
try
@@ -595,18 +594,11 @@
{
if (browseOnly)
{
- if (asyncDelivery)
- {
- executor.execute(browserDeliverer);
- }
- else
- {
- browserDeliverer.run();
- }
+ executor.execute(browserDeliverer);
}
else
{
- session.promptDelivery(messageQueue, asyncDelivery);
+ session.promptDelivery(messageQueue);
}
}
}
@@ -668,7 +660,7 @@
else
{
// prompt Delivery only if chunk was finished
- session.promptDelivery(messageQueue, true);
+ session.promptDelivery(messageQueue);
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -114,30 +114,7 @@
private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
// Static -------------------------------------------------------------------------------
-
- // TODO not actually used currently
- // private static int offset;
- //
- // static
- // {
- // try
- // {
- // ServerMessage msg = new ServerMessageImpl(1, ChannelBuffers.EMPTY_BUFFER);
- //
- // msg.setDestination(new SimpleString("foobar"));
- //
- // int es = msg.getEncodeSize();
- //
- // int me = msg.getMemoryEstimate();
- //
- // offset = MessageReferenceImpl.getMemoryEstimate() + me - es;
- // }
- // catch (Exception e)
- // {
- // log.error("Failed to initialise mult and offset", e);
- // }
- // }
-
+
// Attributes ----------------------------------------------------------------------------
private final long id;
@@ -361,16 +338,9 @@
}
}
- public void promptDelivery(final Queue queue, final boolean async)
+ public void promptDelivery(final Queue queue)
{
- if (async)
- {
- queue.deliverAsync(executor);
- }
- else
- {
- queue.deliverNow();
- }
+ queue.deliverAsync(executor);
}
public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
Modified: trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -21,6 +21,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.tests.unit.core.server.impl.fakes.FakeConsumer;
import org.hornetq.tests.unit.core.server.impl.fakes.FakeQueueFactory;
import org.hornetq.tests.util.UnitTestCase;
@@ -60,7 +61,7 @@
*/
public void testConcurrentAddsDeliver() throws Exception
{
- Queue queue = queueFactory.createQueue(1,
+ QueueImpl queue = (QueueImpl)queueFactory.createQueue(1,
new SimpleString("address1"),
new SimpleString("queue1"),
null,
@@ -162,7 +163,7 @@
{
private volatile Exception e;
- private final Queue queue;
+ private final QueueImpl queue;
private final FakeConsumer consumer;
@@ -182,7 +183,7 @@
return e;
}
- Toggler(final Queue queue, final FakeConsumer consumer, final long testTime)
+ Toggler(final QueueImpl queue, final FakeConsumer consumer, final long testTime)
{
this.testTime = testTime;
Modified: trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -76,7 +76,7 @@
public void testScheduledNoConsumer() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
new SimpleString("address1"),
new SimpleString("queue1"),
null,
@@ -150,7 +150,7 @@
private void testScheduled(final boolean direct) throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
new SimpleString("address1"),
new SimpleString("queue1"),
null,
@@ -265,7 +265,7 @@
return HandleStatus.HANDLED;
}
};
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
new SimpleString("address1"),
QueueImplTest.queue1,
null,
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -69,7 +69,7 @@
{
final SimpleString name = new SimpleString("oobblle");
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
name,
null,
@@ -85,7 +85,7 @@
public void testDurable()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -120,7 +120,7 @@
Consumer cons3 = new FakeConsumer();
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -170,7 +170,7 @@
public void testGetFilter()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -213,7 +213,7 @@
public void testSimpleadd()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -241,7 +241,7 @@
public void testSimpleDirectDelivery() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -278,7 +278,7 @@
public void testSimpleNonDirectDelivery() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -325,7 +325,7 @@
public void testBusyConsumer() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -378,7 +378,7 @@
public void testBusyConsumerThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -454,7 +454,7 @@
public void testAddFirstadd() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -517,7 +517,7 @@
public void testChangeConsumersAndDeliver() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -680,7 +680,7 @@
public void testConsumerReturningNull() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -722,7 +722,7 @@
public void testRoundRobinWithQueueing() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -774,7 +774,7 @@
public void testRoundRobinDirect() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -824,7 +824,7 @@
public void testWithPriorities() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -900,7 +900,7 @@
public void testConsumerWithFilterAddAndRemove()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -918,7 +918,7 @@
public void testList()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -951,7 +951,7 @@
public void testListWithFilter()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -996,7 +996,7 @@
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1074,7 +1074,7 @@
public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1124,7 +1124,7 @@
public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1207,7 +1207,7 @@
public void testConsumerWithFilterThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1280,7 +1280,7 @@
private void testConsumerWithFilters(final boolean direct) throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1378,7 +1378,7 @@
public void testMessageOrder() throws Exception
{
FakeConsumer consumer = new FakeConsumer();
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1407,7 +1407,7 @@
public void testMessagesAdded() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1428,7 +1428,7 @@
public void testGetReference() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1450,7 +1450,7 @@
public void testGetNonExistentReference() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1476,7 +1476,7 @@
*/
public void testPauseAndResumeWithAsync() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1540,7 +1540,7 @@
public void testPauseAndResumeWithDirect() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1588,7 +1588,7 @@
class AddtoQueueRunner implements Runnable
{
- Queue queue;
+ QueueImpl queue;
MessageReference messageReference;
@@ -1599,7 +1599,7 @@
boolean first;
public AddtoQueueRunner(final boolean first,
- final Queue queue,
+ final QueueImpl queue,
final MessageReference messageReference,
final CountDownLatch countDownLatch)
{
More information about the hornetq-commits
mailing list