Author: timfox
Date: 2010-01-11 13:07:20 -0500 (Mon, 11 Jan 2010)
New Revision: 8790
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/connection-ttl.xml
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/Packet.java
trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java
trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
Log:
some packets now processed async + queue has not executor to minimise remoting threads
being held for too long, also some other tweaks
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/docs/user-manual/en/configuration-index.xml 2010-01-11 18:07:20 UTC (rev 8790)
@@ -14,7 +14,7 @@
<!--
-->
<!-- Red Hat, as the licensor of this document, waives the right to enforce,
-->
<!-- and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent
-->
-<!-- permitted by applicable law.
-->
+<!-- permitted by applicable law. a
-->
<!-- =============================================================================
-->
<chapter id="configuration-index">
<title>Configuration Reference</title>
@@ -352,7 +352,7 @@
<entry>Should incoming packets on the server be handed
off to a thread
from the thread pool for processing or should they be
handled on the
remoting thread?</entry>
- <entry>false</entry>
+ <entry>true</entry>
</row>
<row>
<entry><link linkend="transaction-config"
Modified: trunk/docs/user-manual/en/connection-ttl.xml
===================================================================
--- trunk/docs/user-manual/en/connection-ttl.xml 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/docs/user-manual/en/connection-ttl.xml 2010-01-11 18:07:20 UTC (rev 8790)
@@ -160,10 +160,12 @@
<title>Configuring Asynchronous Connection Execution</title>
<para>By default, packets received on the server side are executed on the
remoting
thread.</para>
- <para>It is possible instead to use a thread from a thread pool to handle
the packents so
+ <para>It is possible instead to use a thread from a thread pool to handle
some packets so
that the remoting thread is not tied up for too long. However, please note
that
- processing operations asynchronously on another thread adds a little more
latency. To
- enable asynchronous connection execution, set the parameter <literal
+ processing operations asynchronously on another thread adds a little more
latency.
+ Please note that most short running operations are always handled on the
remoting thread for performance reasons.
+
+ To enable asynchronous connection execution, set the parameter <literal
async-connection-execution-enabled</literal> in <literal
hornetq-configuration.xml</literal> to <literal>true</literal> (default
value is
<literal>false</literal>).</para>
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2010-01-11 16:04:54
UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2010-01-11 18:07:20
UTC (rev 8790)
@@ -72,7 +72,7 @@
public static final long DEFAULT_CONNECTION_TTL_OVERRIDE = -1;
- public static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = false;
+ public static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = true;
public static final String DEFAULT_BINDINGS_DIRECTORY = "data/bindings";
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-01-11
16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -111,8 +111,6 @@
private final Object notificationLock = new Object();
- private final org.hornetq.utils.ExecutorFactory redistributorExecutorFactory;
-
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
private final HornetQServer server;
@@ -127,7 +125,6 @@
final boolean enableWildCardRouting,
final int idCacheSize,
final boolean persistIDCache,
- final ExecutorFactory orderedExecutorFactory,
final HierarchicalRepository<AddressSettings>
addressSettingsRepository)
{
@@ -156,8 +153,6 @@
this.persistIDCache = persistIDCache;
- redistributorExecutorFactory = orderedExecutorFactory;
-
this.addressSettingsRepository = addressSettingsRepository;
this.server = server;
@@ -350,7 +345,7 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay,
redistributorExecutorFactory.getExecutor());
+ queue.addRedistributor(redistributionDelay);
}
}
}
@@ -420,7 +415,7 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay,
redistributorExecutorFactory.getExecutor());
+ queue.addRedistributor(redistributionDelay);
}
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Packet.java 2010-01-11 16:04:54 UTC (rev
8789)
+++ trunk/src/main/org/hornetq/core/remoting/Packet.java 2010-01-11 18:07:20 UTC (rev
8790)
@@ -80,4 +80,6 @@
* @return true if confirmation is required
*/
boolean isRequiresConfirmations();
+
+ boolean isAsyncExec();
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2010-01-11 16:04:54
UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2010-01-11 18:07:20
UTC (rev 8790)
@@ -110,6 +110,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
@@ -221,7 +222,7 @@
}
case SESS_COMMIT:
{
- packet = new PacketImpl(PacketImpl.SESS_COMMIT);
+ packet = new SessionCommitMessage();
break;
}
case SESS_ROLLBACK:
Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-11
16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -334,12 +334,12 @@
channels.clear();
}
}
-
+
public void flushConfirmations()
{
synchronized (transferLock)
{
- for (Channel channel: channels.values())
+ for (Channel channel : channels.values())
{
channel.flushConfirmations();
}
@@ -349,18 +349,16 @@
// Buffer Handler implementation
// ----------------------------------------------------
+ private volatile boolean executing;
+
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
- if (executor == null || packet.getType() == PacketImpl.PING)
+ if (packet.isAsyncExec() && executor != null)
{
- // Pings must always be handled out of band so we can send pings back to the
client quickly
- // otherwise they would get in the queue with everything else which might give
an intolerable delay
- doBufferReceived(packet);
- }
- else
- {
+ executing = true;
+
executor.execute(new Runnable()
{
public void run()
@@ -373,10 +371,24 @@
{
RemotingConnectionImpl.log.error("Unexpected error", t);
}
+
+ executing = false;
}
});
}
-
+ else
+ {
+ //To prevent out of order execution if interleaving sync and async operations on
same connection
+ while (executing)
+ {
+ Thread.yield();
+ }
+
+ // Pings must always be handled out of band so we can send pings back to the
client quickly
+ // otherwise they would get in the queue with everything else which might give
an intolerable delay
+ doBufferReceived(packet);
+ }
+
dataReceived = true;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2010-01-11
16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -262,6 +262,11 @@
{
return true;
}
+
+ public boolean isAsyncExec()
+ {
+ return false;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java 2010-01-11
16:04:54 UTC (rev 8789)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -72,6 +72,11 @@
{
considerLastMessageAsDelivered = buffer.readBoolean();
}
+
+ public boolean isAsyncExec()
+ {
+ return true;
+ }
// Static --------------------------------------------------------
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java 2010-01-11
16:04:54 UTC (rev 8789)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -44,6 +44,12 @@
return super.equals(other);
}
+
+ @Override
+ public boolean isAsyncExec()
+ {
+ return true;
+ }
// Package protected ---------------------------------------------
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java 2010-01-11
16:04:54 UTC (rev 8789)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -62,6 +62,12 @@
}
@Override
+ public boolean isAsyncExec()
+ {
+ return true;
+ }
+
+ @Override
public void encodeRest(final HornetQBuffer buffer)
{
XidCodecSupport.encodeXid(xid, buffer);
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java 2010-01-11
16:04:54 UTC (rev 8789)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -64,6 +64,11 @@
{
xid = XidCodecSupport.decodeXid(buffer);
}
+
+ public boolean isAsyncExec()
+ {
+ return true;
+ }
@Override
public boolean equals(final Object other)
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java 2010-01-11
16:04:54 UTC (rev 8789)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -52,6 +52,7 @@
{
return xid;
}
+
@Override
public void encodeRest(final HornetQBuffer buffer)
@@ -64,6 +65,11 @@
{
xid = XidCodecSupport.decodeXid(buffer);
}
+
+ public boolean isAsyncExec()
+ {
+ return true;
+ }
@Override
public boolean equals(final Object other)
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -63,7 +63,7 @@
void cancel(MessageReference reference) throws Exception;
- void deliverAsync(Executor executor);
+ void deliverAsync();
int getMessageCount();
@@ -116,7 +116,7 @@
int moveReferences(Filter filter, SimpleString toAddress) throws Exception;
- void addRedistributor(long delay, Executor executor);
+ void addRedistributor(long delay);
void cancelRedistributor() throws Exception;
@@ -152,6 +152,8 @@
* @return true if paused, false otherwise.
*/
boolean isPaused();
+
+ Executor getExecutor();
}
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-11 16:04:54 UTC (rev
8789)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-11 18:07:20 UTC (rev
8790)
@@ -68,8 +68,6 @@
void close() throws Exception;
- void promptDelivery(Queue queue);
-
void handleAcknowledge(final SessionAcknowledgeMessage packet);
void handleExpired(final SessionExpiredMessage packet);
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-01-11
16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -540,7 +540,7 @@
if (queue != null)
{
- queue.deliverAsync(executor);
+ queue.deliverAsync();
}
}
}
@@ -683,7 +683,7 @@
queue.addConsumer(BridgeImpl.this);
- queue.deliverAsync(executor);
+ queue.deliverAsync();
BridgeImpl.log.info("Bridge " + name + " is connected to its
destination");
@@ -762,7 +762,7 @@
if (queue != null)
{
- queue.deliverAsync(executor);
+ queue.deliverAsync();
}
}
catch (Exception e)
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2010-01-11
16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -189,7 +189,7 @@
{
active = true;
- queue.deliverAsync(executor);
+ queue.deliverAsync();
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-11 16:04:54
UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-11 18:07:20
UTC (rev 8790)
@@ -633,8 +633,6 @@
Channel channel = connection.getChannel(channelID, sendWindowSize);
- Executor sessionExecutor = executorFactory.getExecutor();
-
final ServerSessionImpl session = new ServerSessionImpl(name,
username,
password,
@@ -649,7 +647,6 @@
postOffice,
resourceManager,
securityStore,
- sessionExecutor,
channel,
managementService,
this,
@@ -657,10 +654,8 @@
sessions.put(name, session);
- // The executor on the OperationContext here has to be the same as the session, or
we would have ordering issues
- // on messages
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
-
storageManager.newContext(sessionExecutor),
+
storageManager.newContext(executorFactory.getExecutor()),
storageManager);
session.setHandler(handler);
@@ -1030,8 +1025,9 @@
if (ConfigurationImpl.DEFAULT_CLUSTER_USER.equals(configuration.getClusterUser())
&&
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD.equals(configuration.getClusterPassword()))
{
- log.warn("It has been detected that the cluster admin user and password
which are used to " + "replicate management operation from one node to the other
have not been changed from the installation default. "
- + "Please see the HornetQ user guide for instructions on how to do
this.");
+ log.warn("Security risk! It has been detected that the cluster admin user
and password "
+ + "have not been changed from the installation default. "
+ + "Please see the HornetQ user guide, cluster chapter, for
instructions on how to do this.");
}
securityStore = new SecurityStoreImpl(securityRepository,
@@ -1059,7 +1055,6 @@
configuration.isWildcardRoutingEnabled(),
configuration.getIDCacheSize(),
configuration.isPersistIDCache(),
- executorFactory,
addressSettingsRepository);
messagingServerControl = managementService.registerServer(postOffice,
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-01-11 16:04:54
UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-01-11 18:07:20
UTC (rev 8790)
@@ -14,6 +14,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.Message;
@@ -53,7 +54,8 @@
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
- final HierarchicalRepository<AddressSettings>
addressSettingsRepository)
+ final HierarchicalRepository<AddressSettings>
addressSettingsRepository,
+ final Executor executor)
{
super(persistenceID,
address,
@@ -64,7 +66,8 @@
scheduledExecutor,
postOffice,
storageManager,
- addressSettingsRepository);
+ addressSettingsRepository,
+ executor);
}
@Override
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2010-01-11 16:04:54
UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2010-01-11 18:07:20
UTC (rev 8790)
@@ -43,6 +43,8 @@
private PostOffice postOffice;
private final StorageManager storageManager;
+
+ private final ExecutorFactory executorFactory;
public QueueFactoryImpl(final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutor,
@@ -54,6 +56,8 @@
this.scheduledExecutor = scheduledExecutor;
this.storageManager = storageManager;
+
+ this.executorFactory = executorFactory;
}
public void setPostOffice(final PostOffice postOffice)
@@ -82,7 +86,8 @@
scheduledExecutor,
postOffice,
storageManager,
- addressSettingsRepository);
+ addressSettingsRepository,
+ executorFactory.getExecutor());
}
else
{
@@ -95,7 +100,8 @@
scheduledExecutor,
postOffice,
storageManager,
- addressSettingsRepository);
+ addressSettingsRepository,
+ executorFactory.getExecutor());
}
return queue;
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 16:04:54 UTC
(rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 18:07:20 UTC
(rev 8790)
@@ -127,7 +127,7 @@
private int pos;
- private final boolean dontAdd;
+ private final Executor executor;
public QueueImpl(final long id,
final SimpleString address,
@@ -138,7 +138,8 @@
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
- final HierarchicalRepository<AddressSettings>
addressSettingsRepository)
+ final HierarchicalRepository<AddressSettings>
addressSettingsRepository,
+ final Executor executor)
{
this.id = id;
@@ -173,7 +174,7 @@
expiryAddress = null;
}
- dontAdd = System.getProperty("org.hornetq.opt.dontadd") != null;
+ this.executor = executor;
}
// Bindable implementation
-------------------------------------------------------------------------------------
@@ -235,7 +236,7 @@
add(ref, true);
}
- public void deliverAsync(final Executor executor)
+ public void deliverAsync()
{
// Prevent too many executors running at once
@@ -244,7 +245,12 @@
executor.execute(deliverRunner);
}
}
-
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+
public synchronized void deliverNow()
{
deliverRunner.run();
@@ -295,7 +301,7 @@
return removed;
}
- public synchronized void addRedistributor(final long delay, final Executor executor)
+ public synchronized void addRedistributor(final long delay)
{
if (future != null)
{
@@ -307,7 +313,7 @@
if (redistributor != null)
{
// Just prompt delivery
- deliverAsync(executor);
+ deliverAsync();
}
if (delay > 0)
@@ -919,7 +925,7 @@
redistributor.start();
- deliverAsync(executor);
+ deliverAsync();
}
}
@@ -1284,11 +1290,6 @@
protected synchronized void add(final MessageReference ref, final boolean first)
{
- if (dontAdd)
- {
- return;
- }
-
if (!first)
{
messagesAdded.incrementAndGet();
@@ -1335,7 +1336,7 @@
// We have consumers with filters which don't match, so we need
// to prompt delivery every time
// a new message arrives
- deliver();
+ deliverAsync();
}
}
}
@@ -1424,7 +1425,7 @@
add(ref, true);
}
- deliver();
+ deliverAsync();
}
}
@@ -1549,7 +1550,7 @@
{
paused = false;
- deliver();
+ deliverAsync();
}
public synchronized boolean isPaused()
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-11
16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -136,7 +136,6 @@
final Channel channel,
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
- final Executor executor,
final ManagementService managementService) throws Exception
{
@@ -150,7 +149,7 @@
messageQueue = binding.getQueue();
- this.executor = executor;
+ this.executor = messageQueue.getExecutor();
this.started = browseOnly || started;
@@ -598,7 +597,7 @@
}
else
{
- session.promptDelivery(messageQueue);
+ messageQueue.deliverAsync();
}
}
}
@@ -660,7 +659,8 @@
else
{
// prompt Delivery only if chunk was finished
- session.promptDelivery(messageQueue);
+
+ messageQueue.deliverAsync();
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-11 16:04:54
UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-11 18:07:20
UTC (rev 8790)
@@ -22,7 +22,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -137,8 +136,6 @@
private final Map<Long, ServerConsumer> consumers = new
ConcurrentHashMap<Long, ServerConsumer>();
- private final Executor executor;
-
private Transaction tx;
private final StorageManager storageManager;
@@ -190,7 +187,6 @@
final PostOffice postOffice,
final ResourceManager resourceManager,
final SecurityStore securityStore,
- final Executor executor,
final Channel channel,
final ManagementService managementService,
final HornetQServer server,
@@ -220,8 +216,6 @@
this.securityStore = securityStore;
- this.executor = executor;
-
if (!xa)
{
tx = new TransactionImpl(storageManager);
@@ -338,11 +332,6 @@
}
}
- public void promptDelivery(final Queue queue)
- {
- queue.deliverAsync(executor);
- }
-
public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
{
SimpleString name = packet.getQueueName();
@@ -376,7 +365,6 @@
channel,
preAcknowledge,
strictUpdateDeliveryCount,
- executor,
managementService);
consumers.put(consumer.getID(), consumer);
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2010-01-11 16:04:54
UTC (rev 8789)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2010-01-11 18:07:20
UTC (rev 8790)
@@ -47,6 +47,7 @@
// Public
---------------------------------------------------------------------------------------
+
public void testCreateBrowserOnNullDestination() throws Exception
{
Connection conn = null;
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java 2010-01-11
16:04:54 UTC (rev 8789)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -55,6 +55,7 @@
// Public --------------------------------------------------------
+
public void testBrowser() throws Exception
{
Connection conn = null;
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-01-11
16:04:54 UTC (rev 8789)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -65,7 +65,7 @@
{
doConsumerReceiveImmediateWithNoMessages(false);
}
-
+
public void testConsumerReceiveImmediate() throws Exception
{
doConsumerReceiveImmediate(false);
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java 2010-01-11
16:04:54 UTC (rev 8789)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.cluster.failover;
+import org.hornetq.core.logging.Logger;
+
/**
* A ReplicatedLargeMessageFailoverTest
*
@@ -22,6 +24,7 @@
*/
public class ReplicatedLargeMessageFailoverTest extends LargeMessageFailoverTest
{
+ private static final Logger log =
Logger.getLogger(ReplicatedLargeMessageFailoverTest.class);
// Constants -----------------------------------------------------
@@ -31,6 +34,7 @@
// Constructors --------------------------------------------------
+
public ReplicatedLargeMessageFailoverTest()
{
super();
Modified: trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2010-01-11
16:04:54 UTC (rev 8789)
+++
trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -84,7 +85,8 @@
scheduledExecutor,
null,
null,
- null);
+ null,
+ Executors.newSingleThreadExecutor());
// Send one scheduled
@@ -158,7 +160,8 @@
scheduledExecutor,
null,
null,
- null);
+ null,
+ Executors.newSingleThreadExecutor());
FakeConsumer consumer = null;
@@ -273,7 +276,8 @@
scheduledExecutor,
null,
null,
- null);
+ null,
+ Executors.newSingleThreadExecutor());
MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-01-11
16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -82,9 +82,9 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#addRedistributor(long,
java.util.concurrent.Executor)
+ * @see org.hornetq.core.server.Queue#addRedistributor(long)
*/
- public void addRedistributor(final long delay, final Executor executor)
+ public void addRedistributor(final long delay)
{
// TODO Auto-generated method stub
@@ -172,9 +172,9 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#deliverAsync(java.util.concurrent.Executor)
+ * @see org.hornetq.core.server.Queue#deliverAsync()
*/
- public void deliverAsync(final Executor executor)
+ public void deliverAsync()
{
// TODO Auto-generated method stub
@@ -520,4 +520,10 @@
return false;
}
+ public Executor getExecutor()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2010-01-11
16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -18,6 +18,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -35,6 +36,7 @@
import org.hornetq.tests.unit.core.server.impl.fakes.FakeFilter;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.Future;
/**
* A QueueTest
@@ -46,18 +48,22 @@
// The tests ----------------------------------------------------------------
private ScheduledExecutorService scheduledExecutor;
+
+ private ExecutorService executor;
@Override
protected void setUp() throws Exception
{
super.setUp();
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ executor = Executors.newSingleThreadExecutor();
}
@Override
protected void tearDown() throws Exception
{
- scheduledExecutor.shutdown();
+ scheduledExecutor.shutdownNow();
+ executor.shutdownNow();
super.tearDown();
}
@@ -70,15 +76,16 @@
final SimpleString name = new SimpleString("oobblle");
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- name,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ name,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
Assert.assertEquals(name, queue.getName());
}
@@ -86,15 +93,16 @@
public void testDurable()
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- false,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ false,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
Assert.assertFalse(queue.isDurable());
@@ -107,7 +115,8 @@
scheduledExecutor,
null,
null,
- null);
+ null,
+ executor);
Assert.assertTrue(queue.isDurable());
}
@@ -121,15 +130,16 @@
Consumer cons3 = new FakeConsumer();
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
Assert.assertEquals(0, queue.getConsumerCount());
@@ -171,15 +181,16 @@
public void testGetFilter()
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
Assert.assertNull(queue.getFilter());
@@ -205,7 +216,8 @@
scheduledExecutor,
null,
null,
- null);
+ null,
+ executor);
Assert.assertEquals(filter, queue.getFilter());
@@ -214,15 +226,16 @@
public void testSimpleadd()
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -242,15 +255,16 @@
public void testSimpleDirectDelivery() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
FakeConsumer consumer = new FakeConsumer();
@@ -279,15 +293,16 @@
public void testSimpleNonDirectDelivery() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -326,15 +341,16 @@
public void testBusyConsumer() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
FakeConsumer consumer = new FakeConsumer();
@@ -379,15 +395,16 @@
public void testBusyConsumerThenAddMoreMessages() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
FakeConsumer consumer = new FakeConsumer();
@@ -455,15 +472,16 @@
public void testAddFirstadd() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -518,15 +536,16 @@
public void testChangeConsumersAndDeliver() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- new FakePostOffice(),
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -681,15 +700,16 @@
public void testConsumerReturningNull() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
class NullConsumer implements Consumer
{
@@ -723,15 +743,16 @@
public void testRoundRobinWithQueueing() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -775,15 +796,16 @@
public void testRoundRobinDirect() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -825,15 +847,16 @@
public void testWithPriorities() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -901,15 +924,16 @@
public void testConsumerWithFilterAddAndRemove()
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
Filter filter = new FakeFilter("fruit", "orange");
@@ -919,15 +943,16 @@
public void testIterator()
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 20;
@@ -952,19 +977,30 @@
}
assertRefListsIdenticalRefs(refs, list);
}
+
+ private void awaitExecution()
+ {
+ Future future = new Future();
+
+ executor.execute(future);
+
+ future.await(10000);
+ }
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
{
+
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- new FakePostOffice(),
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null,
+ executor);
Filter filter = new FakeFilter("fruit", "orange");
@@ -989,6 +1025,8 @@
refs.add(ref2);
Assert.assertEquals(2, queue.getMessageCount());
+
+ awaitExecution();;
Assert.assertEquals(1, consumer.getReferences().size());
@@ -1023,6 +1061,8 @@
refs.add(ref4);
Assert.assertEquals(3, queue.getMessageCount());
+
+ awaitExecution();;
Assert.assertEquals(1, consumer.getReferences().size());
@@ -1034,15 +1074,16 @@
public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color =
'green'"));
@@ -1084,15 +1125,16 @@
public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color =
'green'"));
@@ -1167,15 +1209,16 @@
public void testConsumerWithFilterThenAddMoreMessages() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -1240,15 +1283,16 @@
private void testConsumerWithFilters(final boolean direct) throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- new FakePostOffice(),
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null,
+ executor);
Filter filter = new FakeFilter("fruit", "orange");
@@ -1309,6 +1353,8 @@
}
Assert.assertEquals(6, queue.getMessageCount());
+
+ awaitExecution();;
Assert.assertEquals(2, consumer.getReferences().size());
@@ -1338,15 +1384,16 @@
{
FakeConsumer consumer = new FakeConsumer();
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1367,15 +1414,16 @@
public void testMessagesAdded() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1388,15 +1436,16 @@
public void testGetReference() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1410,15 +1459,16 @@
public void testGetNonExistentReference() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1436,15 +1486,16 @@
public void testPauseAndResumeWithAsync() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
// pauses the queue
queue.pause();
@@ -1483,6 +1534,8 @@
Assert.assertEquals(0, queue.getDeliveringCount());
// resuming work
queue.resume();
+
+ awaitExecution();;
// after resuming the delivery begins.
assertRefListsIdenticalRefs(refs, consumer.getReferences());
@@ -1500,15 +1553,16 @@
public void testPauseAndResumeWithDirect() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
// Now add a consumer
FakeConsumer consumer = new FakeConsumer();
@@ -1538,6 +1592,10 @@
// brings the queue to resumed state.
queue.resume();
+
+
+ awaitExecution();;
+
// resuming delivery of messages
assertRefListsIdenticalRefs(refs, consumer.getReferences());
Assert.assertEquals(numMessages, queue.getMessageCount());
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2010-01-11
16:04:54 UTC (rev 8789)
+++
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2010-01-11
18:07:20 UTC (rev 8790)
@@ -52,6 +52,7 @@
scheduledExecutor,
postOffice,
null,
+ null,
null);
}