Author: timfox
Date: 2010-09-16 11:39:03 -0400 (Thu, 16 Sep 2010)
New Revision: 9699
Added:
trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
Modified:
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
https://jira.jboss.org/browse/HORNETQ-469
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2010-09-16 15:34:41 UTC (rev 9698)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2010-09-16 15:39:03 UTC (rev 9699)
@@ -155,4 +155,6 @@
void blockOnExecutorFuture();
void close() throws Exception;
+
+ boolean isDirectDeliver();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-09-16 15:34:41 UTC
(rev 9698)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-09-16 15:39:03 UTC
(rev 9699)
@@ -75,9 +75,9 @@
public static final int NUM_PRIORITIES = 10;
public static final int MAX_DELIVERIES_IN_LOOP = 1000;
-
- private static final int CHECK_QUEUE_SIZE_PERIOD = 2000;
+ public static final int CHECK_QUEUE_SIZE_PERIOD = 100;
+
private final long id;
private final SimpleString name;
@@ -138,10 +138,10 @@
private final Runnable concurrentPoller = new ConcurrentPoller();
- private volatile boolean queued;
-
- private volatile boolean checkQueueSize = true;
+ private volatile boolean checkDirect;
+ private volatile boolean directDeliver = true;
+
public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
@@ -191,11 +191,15 @@
{
public void run()
{
- checkQueueSize = true;
+ // This flag is periodically set to true. This enables the directDeliver flag
to be set to true if the queue
+ // is empty
+ // We don't want to evaluate that on every delivery since that's too
expensive
+
+ checkDirect = true;
}
}, CHECK_QUEUE_SIZE_PERIOD, CHECK_QUEUE_SIZE_PERIOD, TimeUnit.MILLISECONDS);
}
-
+
// Bindable implementation
-------------------------------------------------------------------------------------
public SimpleString getRoutingName()
@@ -252,12 +256,10 @@
{
return;
}
-
+
messageReferences.addHead(ref, ref.getMessage().getPriority());
-
- queued = true;
-
- checkQueueSize = false;
+
+ directDeliver = false;
}
public synchronized void reload(final MessageReference ref)
@@ -267,9 +269,7 @@
messageReferences.addTail(ref, ref.getMessage().getPriority());
}
- queued = true;
-
- checkQueueSize = false;
+ directDeliver = false;
messagesAdded++;
}
@@ -291,41 +291,47 @@
return;
}
- if (checkQueueSize)
+ // The checkDirect flag is periodically set to true, if the delivery is specified
as direct then this causes the
+ // directDeliver flag to be re-computed resulting in direct delivery if the queue
is empty
+ // We don't recompute it on every delivery since executing isEmpty is expensive
for a ConcurrentQueue
+ if (checkDirect)
{
- // This is an expensive operation so we don't want to do it every time we
add a message, that's why we use the checkQueueSize flag
- // which is set to true periodically using a scheduled executor
+ if (direct && !directDeliver && concurrentQueue.isEmpty()
&& messageReferences.isEmpty())
+ {
+ // We must block on the executor to ensure any async deliveries have
completed or we might get out of order
+ // deliveries
+ blockOnExecutorFuture();
- queued = !messageReferences.isEmpty() || !concurrentQueue.isEmpty();
-
- checkQueueSize = false;
+ // Go into direct delivery mode
+ directDeliver = true;
+ }
+ checkDirect = false;
}
- if (direct & !queued)
+ if (direct && directDeliver && deliverDirect(ref))
{
- if (deliverDirect(ref))
- {
- return;
- }
+ return;
}
concurrentQueue.add(ref);
+ directDeliver = false;
+
executor.execute(concurrentPoller);
}
-
+
public void deliverAsync()
{
executor.execute(deliverRunner);
}
-
+
public void close() throws Exception
{
if (checkQueueSizeFuture != null)
{
checkQueueSizeFuture.cancel(false);
}
-
+
cancelRedistributor();
}
@@ -486,7 +492,7 @@
redistributorFuture = null;
}
}
-
+
@Override
protected void finalize() throws Throwable
{
@@ -494,9 +500,9 @@
{
checkQueueSizeFuture.cancel(false);
}
-
+
cancelRedistributor();
-
+
super.finalize();
}
@@ -998,6 +1004,11 @@
{
return paused;
}
+
+ public boolean isDirectDeliver()
+ {
+ return directDeliver;
+ }
// Public
// -----------------------------------------------------------------------------
@@ -1075,10 +1086,10 @@
// Schedule another one - we do this to prevent a single thread getting
caught up in this loop for too long
deliverAsync();
-
+
return;
}
-
+
ConsumerHolder holder = consumerList.get(pos);
Consumer consumer = holder.consumer;
@@ -1169,7 +1180,7 @@
if (pos == size)
{
pos = 0;
- }
+ }
}
}
Added: trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2010-09-16
15:39:03 UTC (rev 9699)
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.remoting;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ *
+ * A DirectDeliverTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class DirectDeliverTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(DirectDeliverTest.class);
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.DIRECT_DELIVER, true);
+
+ TransportConfiguration tc = new
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+
+ Configuration config = new ConfigurationImpl();
+ config.getAcceptorConfigurations().add(tc);
+
+ config.setSecurityEnabled(false);
+ server = createServer(false, config);
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ protected ClientSessionFactory createSessionFactory()
+ {
+ ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new
TransportConfiguration(NettyConnectorFactory.class.getName()));
+
+ return sf;
+ }
+
+ public void testDirectDeliver() throws Exception
+ {
+ final String foo = "foo";
+
+ ClientSessionFactory sf = createSessionFactory();
+
+ ClientSession session = sf.createSession();
+
+ session.createQueue(foo, foo);
+
+ Binding binding = server.getPostOffice().getBinding(new SimpleString(foo));
+
+ Queue queue = (Queue)binding.getBindable();
+
+ assertTrue(queue.isDirectDeliver());
+
+ ClientProducer prod = session.createProducer(foo);
+
+ ClientConsumer cons = session.createConsumer(foo);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+
+ prod.send(msg);
+ }
+
+ queue.blockOnExecutorFuture();
+
+ //Consumer is not started so should go queued
+ assertFalse(queue.isDirectDeliver());
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = cons.receive(10000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+ }
+
+ Thread.sleep((long)(QueueImpl.CHECK_QUEUE_SIZE_PERIOD * 1.5));
+
+ //Add another message, should go direct
+ ClientMessage msg = session.createMessage(true);
+
+ prod.send(msg);
+
+ queue.blockOnExecutorFuture();
+
+ assertTrue(queue.isDirectDeliver());
+
+ //Send some more
+ for (int i = 0; i < numMessages; i++)
+ {
+ msg = session.createMessage(true);
+
+ prod.send(msg);
+ }
+
+ for (int i = 0; i < numMessages + 1; i++)
+ {
+ msg = cons.receive(10000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+ }
+
+ assertTrue(queue.isDirectDeliver());
+
+ session.stop();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ msg = session.createMessage(true);
+
+ prod.send(msg);
+ }
+
+ assertFalse(queue.isDirectDeliver());
+
+
+ sf.close();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-09-16
15:34:41 UTC (rev 9698)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-09-16
15:39:03 UTC (rev 9699)
@@ -27,8 +27,21 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
+/**
+ * A FakeQueue
+ *
+ * @author tim
+ *
+ *
+ */
public class FakeQueue implements Queue
{
+ public boolean isDirectDeliver()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
public void close()
{
// TODO Auto-generated method stub